Donate to e Foundation | Murena handsets with /e/OS | Own a part of Murena! Learn more

Commit 2b99ee56 authored by Corey Tabaka's avatar Corey Tabaka
Browse files

Support multiple consumer queues.

- Add support for importing posted buffers when spawing a new
  consumer queue.
- Correctly handle adding signaled buffers to epoll with edge
  triggered mode set.
- Add test for multi-consumer behavior.

Bug: 36401174
Test: buffer_hub_queue-test passes.
Change-Id: Id09f01502a1b18bf80a0ae465c2941b548cde2e4
parent 9d8bd095
Loading
Loading
Loading
Loading
+72 −20
Original line number Diff line number Diff line
@@ -3,6 +3,7 @@
#include <inttypes.h>
#include <log/log.h>
#include <sys/epoll.h>
#include <poll.h>

#include <array>

@@ -11,6 +12,15 @@
#include <pdx/file_handle.h>
#include <private/dvr/bufferhub_rpc.h>

#define RETRY_EINTR(fnc_call)                 \
  ([&]() -> decltype(fnc_call) {              \
    decltype(fnc_call) result;                \
    do {                                      \
      result = (fnc_call);                    \
    } while (result == -1 && errno == EINTR); \
    return result;                            \
  })()

using android::pdx::ErrorStatus;
using android::pdx::LocalChannelHandle;
using android::pdx::Status;
@@ -113,8 +123,10 @@ bool BufferHubQueue::WaitForBuffers(int timeout) {

  // Loop at least once to check for hangups.
  do {
    ALOGD_IF(TRACE, "BufferHubQueue::WaitForBuffers: count=%zu capacity=%zu",
             count(), capacity());
    ALOGD_IF(
        TRACE,
        "BufferHubQueue::WaitForBuffers: queue_id=%d count=%zu capacity=%zu",
        id(), count(), capacity());

    // If there is already a buffer then just check for hangup without waiting.
    const int ret = epoll_fd_.Wait(events.data(), events.size(),
@@ -122,7 +134,9 @@ bool BufferHubQueue::WaitForBuffers(int timeout) {

    if (ret == 0) {
      ALOGI_IF(TRACE,
               "BufferHubQueue::WaitForBuffers: No events before timeout.");
               "BufferHubQueue::WaitForBuffers: No events before timeout: "
               "queue_id=%d",
               id());
      return count() != 0;
    }

@@ -145,9 +159,9 @@ bool BufferHubQueue::WaitForBuffers(int timeout) {
               index);

      if (is_buffer_event_index(index)) {
        HandleBufferEvent(static_cast<size_t>(index), events[i]);
        HandleBufferEvent(static_cast<size_t>(index), events[i].events);
      } else if (is_queue_event_index(index)) {
        HandleQueueEvent(events[i]);
        HandleQueueEvent(events[i].events);
      } else {
        ALOGW("BufferHubQueue::WaitForBuffers: Unknown event index: %" PRId64,
              index);
@@ -158,29 +172,39 @@ bool BufferHubQueue::WaitForBuffers(int timeout) {
  return count() != 0;
}

void BufferHubQueue::HandleBufferEvent(size_t slot, const epoll_event& event) {
void BufferHubQueue::HandleBufferEvent(size_t slot, int poll_events) {
  auto buffer = buffers_[slot];
  if (!buffer) {
    ALOGW("BufferHubQueue::HandleBufferEvent: Invalid buffer slot: %zu", slot);
    return;
  }

  auto status = buffer->GetEventMask(event.events);
  auto status = buffer->GetEventMask(poll_events);
  if (!status) {
    ALOGW("BufferHubQueue::HandleBufferEvent: Failed to get event mask: %s",
          status.GetErrorMessage().c_str());
    return;
  }

  int events = status.get();
  const int events = status.get();
  if (events & EPOLLIN) {
    int ret = OnBufferReady(buffer, &fences_[slot]);
    if (ret < 0) {
      ALOGE("BufferHubQueue::HandleBufferEvent: Failed to set buffer ready: %s",
            strerror(-ret));
      return;
    }
    const int ret = OnBufferReady(buffer, &fences_[slot]);
    if (ret == 0 || ret == -EALREADY || ret == -EBUSY) {
      // Only enqueue the buffer if it moves to or is already in the state
      // requested in OnBufferReady(). If the buffer is busy this means that the
      // buffer moved from released to posted when a new consumer was created
      // before the ProducerQueue had a chance to regain it. This is a valid
      // transition that we have to handle because edge triggered poll events
      // latch the ready state even if it is later de-asserted -- don't enqueue
      // or print an error log in this case.
      if (ret != -EBUSY)
        Enqueue(buffer, slot);
    } else {
      ALOGE(
          "BufferHubQueue::HandleBufferEvent: Failed to set buffer ready, "
          "queue_id=%d buffer_id=%d: %s",
          id(), buffer->id(), strerror(-ret));
    }
  } else if (events & EPOLLHUP) {
    // This might be caused by producer replacing an existing buffer slot, or
    // when BufferHubQueue is shutting down. For the first case, currently the
@@ -203,15 +227,15 @@ void BufferHubQueue::HandleBufferEvent(size_t slot, const epoll_event& event) {
  }
}

void BufferHubQueue::HandleQueueEvent(const epoll_event& event) {
  auto status = GetEventMask(event.events);
void BufferHubQueue::HandleQueueEvent(int poll_event) {
  auto status = GetEventMask(poll_event);
  if (!status) {
    ALOGW("BufferHubQueue::HandleQueueEvent: Failed to get event mask: %s",
          status.GetErrorMessage().c_str());
    return;
  }

  int events = status.get();
  const int events = status.get();
  if (events & EPOLLIN) {
    // Note that after buffer imports, if |count()| still returns 0, epoll
    // wait will be tried again to acquire the newly imported buffer.
@@ -224,7 +248,7 @@ void BufferHubQueue::HandleQueueEvent(const epoll_event& event) {
    ALOGD_IF(TRACE, "BufferHubQueue::HandleQueueEvent: hang up event!");
    hung_up_ = true;
  } else {
    ALOGW("BufferHubQueue::HandleQueueEvent: Unknown epoll events=%d", events);
    ALOGW("BufferHubQueue::HandleQueueEvent: Unknown epoll events=%x", events);
  }
}

@@ -407,6 +431,8 @@ int ProducerQueue::AllocateBuffer(uint32_t width, uint32_t height,

int ProducerQueue::AddBuffer(const std::shared_ptr<BufferProducer>& buf,
                             size_t slot) {
  ALOGD_IF(TRACE, "ProducerQueue::AddBuffer: queue_id=%d buffer_id=%d slot=%zu",
           id(), buf->id(), slot);
  // For producer buffer, we need to enqueue the newly added buffer
  // immediately. Producer queue starts with all buffers in available state.
  const int ret = BufferHubQueue::AddBuffer(buf, slot);
@@ -447,6 +473,8 @@ Status<std::shared_ptr<BufferProducer>> ProducerQueue::Dequeue(

int ProducerQueue::OnBufferReady(const std::shared_ptr<BufferHubBuffer>& buf,
                                 LocalHandle* release_fence) {
  ALOGD_IF(TRACE, "ProducerQueue::OnBufferReady: queue_id=%d buffer_id=%d",
           id(), buf->id());
  auto buffer = std::static_pointer_cast<BufferProducer>(buf);
  return buffer->Gain(release_fence);
}
@@ -525,8 +553,30 @@ Status<size_t> ConsumerQueue::ImportBuffers() {

int ConsumerQueue::AddBuffer(const std::shared_ptr<BufferConsumer>& buf,
                             size_t slot) {
  // Consumer queue starts with all buffers in unavailable state.
  return BufferHubQueue::AddBuffer(buf, slot);
  ALOGD_IF(TRACE, "ConsumerQueue::AddBuffer: queue_id=%d buffer_id=%d slot=%zu",
           id(), buf->id(), slot);
  const int ret = BufferHubQueue::AddBuffer(buf, slot);
  if (ret < 0)
    return ret;

  // Check to see if the buffer is already signaled. This is necessary to catch
  // cases where buffers are already available; epoll edge triggered mode does
  // not fire until and edge transition when adding new buffers to the epoll
  // set.
  const int kTimeoutMs = 0;
  pollfd pfd{buf->event_fd(), POLLIN, 0};
  const int count = RETRY_EINTR(poll(&pfd, 1, kTimeoutMs));
  if (count < 0) {
    const int error = errno;
    ALOGE("ConsumerQueue::AddBuffer: Failed to poll consumer buffer: %s",
          strerror(errno));
    return -error;
  }

  if (count == 1)
    HandleBufferEvent(slot, pfd.revents);

  return 0;
}

Status<std::shared_ptr<BufferConsumer>> ConsumerQueue::Dequeue(
@@ -558,6 +608,8 @@ Status<std::shared_ptr<BufferConsumer>> ConsumerQueue::Dequeue(

int ConsumerQueue::OnBufferReady(const std::shared_ptr<BufferHubBuffer>& buf,
                                 LocalHandle* acquire_fence) {
  ALOGD_IF(TRACE, "ConsumerQueue::OnBufferReady: queue_id=%d buffer_id=%d",
           id(), buf->id());
  auto buffer = std::static_pointer_cast<BufferConsumer>(buf);
  return buffer->Acquire(acquire_fence, meta_buffer_tmp_.get(), meta_size_);
}
+23 −2
Original line number Diff line number Diff line
@@ -136,8 +136,8 @@ class BufferHubQueue : public pdx::Client {

  // Wait for buffers to be released and re-add them to the queue.
  bool WaitForBuffers(int timeout);
  void HandleBufferEvent(size_t slot, const epoll_event& event);
  void HandleQueueEvent(const epoll_event& event);
  void HandleBufferEvent(size_t slot, int poll_events);
  void HandleQueueEvent(int poll_events);

  virtual int OnBufferReady(const std::shared_ptr<BufferHubBuffer>& buf,
                            LocalHandle* fence) = 0;
@@ -308,6 +308,14 @@ class ProducerQueue : public pdx::ClientBase<ProducerQueue, BufferHubQueue> {
    return BASE::Create(sizeof(Meta), usage_set_mask, usage_clear_mask,
                        usage_deny_set_mask, usage_deny_clear_mask);
  }
  static std::unique_ptr<ProducerQueue> Create(size_t meta_size_bytes,
                                               uint32_t usage_set_mask,
                                               uint32_t usage_clear_mask,
                                               uint32_t usage_deny_set_mask,
                                               uint32_t usage_deny_clear_mask) {
    return BASE::Create(meta_size_bytes, usage_set_mask, usage_clear_mask,
                        usage_deny_set_mask, usage_deny_clear_mask);
  }

  // Import a |ProducerQueue| from a channel handle.
  static std::unique_ptr<ProducerQueue> Import(LocalChannelHandle handle) {
@@ -362,6 +370,19 @@ class ProducerQueue : public pdx::ClientBase<ProducerQueue, BufferHubQueue> {
                    LocalHandle* release_fence) override;
};

// Explicit specializations of ProducerQueue::Create for void metadata type.
template <>
inline std::unique_ptr<ProducerQueue> ProducerQueue::Create<void>() {
  return ProducerQueue::Create(0);
}
template <>
inline std::unique_ptr<ProducerQueue> ProducerQueue::Create<void>(
    uint32_t usage_set_mask, uint32_t usage_clear_mask,
    uint32_t usage_deny_set_mask, uint32_t usage_deny_clear_mask) {
  return ProducerQueue::Create(0, usage_set_mask, usage_clear_mask,
                               usage_deny_set_mask, usage_deny_clear_mask);
}

class ConsumerQueue : public BufferHubQueue {
 public:
  // Get a buffer consumer. Note that the method doesn't check whether the
+93 −8
Original line number Diff line number Diff line
@@ -22,20 +22,33 @@ constexpr int kBufferSliceCount = 1; // number of slices in each buffer
class BufferHubQueueTest : public ::testing::Test {
 public:
  template <typename Meta>
  bool CreateQueues(int usage_set_mask = 0, int usage_clear_mask = 0,
                    int usage_deny_set_mask = 0,
                    int usage_deny_clear_mask = 0) {
  bool CreateProducerQueue(uint64_t usage_set_mask = 0,
                           uint64_t usage_clear_mask = 0,
                           uint64_t usage_deny_set_mask = 0,
                           uint64_t usage_deny_clear_mask = 0) {
    producer_queue_ =
        ProducerQueue::Create<Meta>(usage_set_mask, usage_clear_mask,
                                    usage_deny_set_mask, usage_deny_clear_mask);
    if (!producer_queue_)
      return false;
    return producer_queue_ != nullptr;
  }

  bool CreateConsumerQueue() {
    if (producer_queue_) {
      consumer_queue_ = producer_queue_->CreateConsumerQueue();
    if (!consumer_queue_)
      return consumer_queue_ != nullptr;
    } else {
      return false;
    }
  }

    return true;
  template <typename Meta>
  bool CreateQueues(int usage_set_mask = 0, int usage_clear_mask = 0,
                    int usage_deny_set_mask = 0,
                    int usage_deny_clear_mask = 0) {
    return CreateProducerQueue<Meta>(usage_set_mask, usage_clear_mask,
                                     usage_deny_set_mask,
                                     usage_deny_clear_mask) &&
           CreateConsumerQueue();
  }

  void AllocateBuffer() {
@@ -134,6 +147,78 @@ TEST_F(BufferHubQueueTest, TestProducerConsumer) {
  }
}

TEST_F(BufferHubQueueTest, TestMultipleConsumers) {
  ASSERT_TRUE(CreateProducerQueue<void>());

  // Allocate buffers.
  const size_t kBufferCount = 4u;
  for (size_t i = 0; i < kBufferCount; i++) {
    AllocateBuffer();
  }
  ASSERT_EQ(kBufferCount, producer_queue_->count());

  // Build a silent consumer queue to test multi-consumer queue features.
  auto silent_queue = producer_queue_->CreateSilentConsumerQueue();
  ASSERT_NE(nullptr, silent_queue);

  // Check that buffers are correctly imported on construction.
  EXPECT_EQ(kBufferCount, silent_queue->capacity());

  // Dequeue and post a buffer.
  size_t slot;
  LocalHandle fence;
  auto producer_status = producer_queue_->Dequeue(0, &slot, &fence);
  ASSERT_TRUE(producer_status.ok());
  auto producer_buffer = producer_status.take();
  ASSERT_NE(nullptr, producer_buffer);
  ASSERT_EQ(0, producer_buffer->Post<void>({}));

  // Currently we expect no buffer to be available prior to calling
  // WaitForBuffers/HandleQueueEvents.
  // TODO(eieio): Note this behavior may change in the future.
  EXPECT_EQ(0u, silent_queue->count());
  EXPECT_FALSE(silent_queue->HandleQueueEvents());
  EXPECT_EQ(0u, silent_queue->count());

  // Build a new consumer queue to test multi-consumer queue features.
  consumer_queue_ = silent_queue->CreateConsumerQueue();
  ASSERT_NE(nullptr, consumer_queue_);

  // Check that buffers are correctly imported on construction.
  EXPECT_EQ(kBufferCount, consumer_queue_->capacity());
  EXPECT_EQ(1u, consumer_queue_->count());

  // Reclaim released/ignored buffers.
  producer_queue_->HandleQueueEvents();
  ASSERT_EQ(kBufferCount - 1, producer_queue_->count());

  // Post another buffer.
  producer_status = producer_queue_->Dequeue(0, &slot, &fence);
  ASSERT_TRUE(producer_status.ok());
  producer_buffer = producer_status.take();
  ASSERT_NE(nullptr, producer_buffer);
  ASSERT_EQ(0, producer_buffer->Post<void>({}));

  // Verify that the consumer queue receives it.
  EXPECT_EQ(1u, consumer_queue_->count());
  EXPECT_TRUE(consumer_queue_->HandleQueueEvents());
  EXPECT_EQ(2u, consumer_queue_->count());

  // Dequeue and acquire/release (discard) buffers on the consumer end.
  auto consumer_status = consumer_queue_->Dequeue(0, &slot, &fence);
  ASSERT_TRUE(consumer_status.ok());
  auto consumer_buffer = consumer_status.take();
  ASSERT_NE(nullptr, consumer_buffer);
  consumer_buffer->Discard();

  // Buffer should be returned to the producer queue without being handled by
  // the silent consumer queue.
  EXPECT_EQ(1u, consumer_queue_->count());
  EXPECT_EQ(kBufferCount - 2, producer_queue_->count());
  EXPECT_TRUE(producer_queue_->HandleQueueEvents());
  EXPECT_EQ(kBufferCount - 1, producer_queue_->count());
}

struct TestMetadata {
  char a;
  int32_t b;