Donate to e Foundation | Murena handsets with /e/OS | Own a part of Murena! Learn more

Commit f09f63f7 authored by Tianyu Jiang's avatar Tianyu Jiang
Browse files

Let producer gain posted buffer if no released buffer exist.

ProducerQueue::Dequeue functionality does not change with the default
param gain_posted_buffer = false. It does the following otherwise:
1. try to gain a released buffer and return.
2. try to find the oldest posted buffer and return.

Bug: 80164475
Test: on Taimen with the following tests
buffer_hub-test buffer_hub_queue-test buffer_hub_queue_producer-test
dvr_api-test dvr_buffer_queue-test dvr_display-test

Change-Id: I6b27740388e53348024054396b784dbc6c08ca7e
parent cc8d6c1a
Loading
Loading
Loading
Loading
+109 −61
Original line number Diff line number Diff line
@@ -69,7 +69,7 @@ void BufferHubQueue::Initialize() {
      .data = {.u64 = Stuff(-1, BufferHubQueue::kEpollQueueEventIndex)}};
  ret = epoll_fd_.Control(EPOLL_CTL_ADD, event_fd(), &event);
  if (ret < 0) {
    ALOGE("BufferHubQueue::Initialize: Failed to add event fd to epoll set: %s",
    ALOGE("%s: Failed to add event fd to epoll set: %s", __FUNCTION__,
          strerror(-ret));
  }
}
@@ -77,7 +77,7 @@ void BufferHubQueue::Initialize() {
Status<void> BufferHubQueue::ImportQueue() {
  auto status = InvokeRemoteMethod<BufferHubRPC::GetQueueInfo>();
  if (!status) {
    ALOGE("BufferHubQueue::ImportQueue: Failed to import queue: %s",
    ALOGE("%s: Failed to import queue: %s", __FUNCTION__,
          status.GetErrorMessage().c_str());
    return ErrorStatus(status.error());
  } else {
@@ -136,9 +136,7 @@ BufferHubQueue::CreateConsumerQueueParcelable(bool silent) {
      consumer_queue->GetChannel()->TakeChannelParcelable());

  if (!queue_parcelable.IsValid()) {
    ALOGE(
        "BufferHubQueue::CreateConsumerQueueParcelable: Failed to create "
        "consumer queue parcelable.");
    ALOGE("%s: Failed to create consumer queue parcelable.", __FUNCTION__);
    return ErrorStatus(EINVAL);
  }

@@ -169,8 +167,7 @@ bool BufferHubQueue::WaitForBuffers(int timeout) {
    }

    if (ret < 0 && ret != -EINTR) {
      ALOGE("BufferHubQueue::WaitForBuffers: Failed to wait for buffers: %s",
            strerror(-ret));
      ALOGE("%s: Failed to wait for buffers: %s", __FUNCTION__, strerror(-ret));
      return false;
    }

@@ -264,14 +261,14 @@ Status<void> BufferHubQueue::HandleQueueEvent(int poll_event) {
    // wait will be tried again to acquire the newly imported buffer.
    auto buffer_status = OnBufferAllocated();
    if (!buffer_status) {
      ALOGE("BufferHubQueue::HandleQueueEvent: Failed to import buffer: %s",
      ALOGE("%s: Failed to import buffer: %s", __FUNCTION__,
            buffer_status.GetErrorMessage().c_str());
    }
  } else if (events & EPOLLHUP) {
    ALOGD_IF(TRACE, "BufferHubQueue::HandleQueueEvent: hang up event!");
    ALOGD_IF(TRACE, "%s: hang up event!", __FUNCTION__);
    hung_up_ = true;
  } else {
    ALOGW("BufferHubQueue::HandleQueueEvent: Unknown epoll events=%x", events);
    ALOGW("%s: Unknown epoll events=%x", __FUNCTION__, events);
  }

  return {};
@@ -279,12 +276,11 @@ Status<void> BufferHubQueue::HandleQueueEvent(int poll_event) {

Status<void> BufferHubQueue::AddBuffer(
    const std::shared_ptr<BufferHubBase>& buffer, size_t slot) {
  ALOGD_IF(TRACE, "BufferHubQueue::AddBuffer: buffer_id=%d slot=%zu",
           buffer->id(), slot);
  ALOGD_IF(TRACE, "%s: buffer_id=%d slot=%zu", __FUNCTION__, buffer->id(),
           slot);

  if (is_full()) {
    ALOGE("BufferHubQueue::AddBuffer queue is at maximum capacity: %zu",
          capacity_);
    ALOGE("%s: queue is at maximum capacity: %zu", __FUNCTION__, capacity_);
    return ErrorStatus(E2BIG);
  }

@@ -303,7 +299,7 @@ Status<void> BufferHubQueue::AddBuffer(
    const int ret =
        epoll_fd_.Control(EPOLL_CTL_ADD, event_source.event_fd, &event);
    if (ret < 0) {
      ALOGE("BufferHubQueue::AddBuffer: Failed to add buffer to epoll set: %s",
      ALOGE("%s: Failed to add buffer to epoll set: %s", __FUNCTION__,
            strerror(-ret));
      return ErrorStatus(-ret);
    }
@@ -315,16 +311,14 @@ Status<void> BufferHubQueue::AddBuffer(
}

Status<void> BufferHubQueue::RemoveBuffer(size_t slot) {
  ALOGD_IF(TRACE, "BufferHubQueue::RemoveBuffer: slot=%zu", slot);
  ALOGD_IF(TRACE, "%s: slot=%zu", __FUNCTION__, slot);

  if (buffers_[slot]) {
    for (const auto& event_source : buffers_[slot]->GetEventSources()) {
      const int ret =
          epoll_fd_.Control(EPOLL_CTL_DEL, event_source.event_fd, nullptr);
      if (ret < 0) {
        ALOGE(
            "BufferHubQueue::RemoveBuffer: Failed to remove buffer from epoll "
            "set: %s",
        ALOGE("%s: Failed to remove buffer from epoll set: %s", __FUNCTION__,
              strerror(-ret));
        return ErrorStatus(-ret);
      }
@@ -345,23 +339,31 @@ Status<void> BufferHubQueue::Enqueue(Entry entry) {
  if (!is_full()) {
    available_buffers_.push(std::move(entry));

    // Find and remove the enqueued buffer from unavailable_buffers_slot if
    // exist.
    auto enqueued_buffer_iter = std::find_if(
        unavailable_buffers_slot_.begin(), unavailable_buffers_slot_.end(),
        [&entry](size_t slot) -> bool { return slot == entry.slot; });
    if (enqueued_buffer_iter != unavailable_buffers_slot_.end()) {
      unavailable_buffers_slot_.erase(enqueued_buffer_iter);
    }

    // Trigger OnBufferAvailable callback if registered.
    if (on_buffer_available_)
      on_buffer_available_();

    return {};
  } else {
    ALOGE("BufferHubQueue::Enqueue: Buffer queue is full!");
    ALOGE("%s: Buffer queue is full!", __FUNCTION__);
    return ErrorStatus(E2BIG);
  }
}

Status<std::shared_ptr<BufferHubBase>> BufferHubQueue::Dequeue(int timeout,
                                                               size_t* slot) {
  ALOGD_IF(TRACE, "BufferHubQueue::Dequeue: count=%zu, timeout=%d", count(),
           timeout);
  ALOGD_IF(TRACE, "%s: count=%zu, timeout=%d", __FUNCTION__, count(), timeout);

  PDX_TRACE_FORMAT("BufferHubQueue::Dequeue|count=%zu|", count());
  PDX_TRACE_FORMAT("%s|count=%zu|", __FUNCTION__, count());

  if (count() == 0) {
    if (!WaitForBuffers(timeout))
@@ -376,6 +378,7 @@ Status<std::shared_ptr<BufferHubBase>> BufferHubQueue::Dequeue(int timeout,
  *slot = entry.slot;

  available_buffers_.pop();
  unavailable_buffers_slot_.push_back(*slot);

  return {std::move(buffer)};
}
@@ -564,7 +567,7 @@ Status<void> ProducerQueue::RemoveBuffer(size_t slot) {
  auto status =
      InvokeRemoteMethod<BufferHubRPC::ProducerQueueRemoveBuffer>(slot);
  if (!status) {
    ALOGE("ProducerQueue::RemoveBuffer: Failed to remove producer buffer: %s",
    ALOGE("%s: Failed to remove producer buffer: %s", __FUNCTION__,
          status.GetErrorMessage().c_str());
    return status.error_status();
  }
@@ -580,31 +583,81 @@ Status<std::shared_ptr<BufferProducer>> ProducerQueue::Dequeue(

pdx::Status<std::shared_ptr<BufferProducer>> ProducerQueue::Dequeue(
    int timeout, size_t* slot, DvrNativeBufferMetadata* out_meta,
    pdx::LocalHandle* release_fence) {
    pdx::LocalHandle* release_fence, bool gain_posted_buffer) {
  ATRACE_NAME("ProducerQueue::Dequeue");
  if (slot == nullptr || out_meta == nullptr || release_fence == nullptr) {
    ALOGE("ProducerQueue::Dequeue: Invalid parameter.");
    ALOGE("%s: Invalid parameter.", __FUNCTION__);
    return ErrorStatus(EINVAL);
  }

  auto status = BufferHubQueue::Dequeue(timeout, slot);
  if (!status)
    return status.error_status();

  auto buffer = std::static_pointer_cast<BufferProducer>(status.take());
  const int ret = buffer->GainAsync(out_meta, release_fence);
  std::shared_ptr<BufferProducer> buffer;
  Status<std::shared_ptr<BufferHubBase>> dequeue_status =
      BufferHubQueue::Dequeue(timeout, slot);
  if (dequeue_status.ok()) {
    buffer = std::static_pointer_cast<BufferProducer>(dequeue_status.take());
  } else {
    if (gain_posted_buffer) {
      Status<std::shared_ptr<BufferProducer>> dequeue_unacquired_status =
          ProducerQueue::DequeueUnacquiredBuffer(slot);
      if (!dequeue_unacquired_status.ok()) {
        ALOGE("%s: DequeueUnacquiredBuffer returned error: %d", __FUNCTION__,
              dequeue_unacquired_status.error());
        return dequeue_unacquired_status.error_status();
      }
      buffer = dequeue_unacquired_status.take();
    } else {
      return dequeue_status.error_status();
    }
  }
  const int ret =
      buffer->GainAsync(out_meta, release_fence, gain_posted_buffer);
  if (ret < 0 && ret != -EALREADY)
    return ErrorStatus(-ret);

  return {std::move(buffer)};
}

Status<std::shared_ptr<BufferProducer>> ProducerQueue::DequeueUnacquiredBuffer(
    size_t* slot) {
  if (unavailable_buffers_slot_.size() < 1) {
    ALOGE(
        "%s: Failed to dequeue un-acquired buffer. All buffer(s) are in "
        "acquired state if exist.",
        __FUNCTION__);
    return ErrorStatus(ENOMEM);
  }

  // Find the first buffer that is not in acquired state from
  // unavailable_buffers_slot_.
  for (auto iter = unavailable_buffers_slot_.begin();
       iter != unavailable_buffers_slot_.end(); iter++) {
    std::shared_ptr<BufferProducer> buffer = ProducerQueue::GetBuffer(*iter);
    if (buffer == nullptr) {
      ALOGE("%s failed. Buffer slot %d is  null.", __FUNCTION__,
            static_cast<int>(*slot));
      return ErrorStatus(EIO);
    }
    if (!BufferHubDefs::IsBufferAcquired(buffer->buffer_state())) {
      *slot = *iter;
      unavailable_buffers_slot_.erase(iter);
      unavailable_buffers_slot_.push_back(*slot);
      ALOGD("%s: Producer queue dequeue unacquired buffer in slot %d",
            __FUNCTION__, static_cast<int>(*slot));
      return {std::move(buffer)};
    }
  }
  ALOGE(
      "%s: Failed to dequeue un-acquired buffer. No un-acquired buffer exist.",
      __FUNCTION__);
  return ErrorStatus(EBUSY);
}

pdx::Status<ProducerQueueParcelable> ProducerQueue::TakeAsParcelable() {
  if (capacity() != 0) {
    ALOGE(
        "ProducerQueue::TakeAsParcelable: producer queue can only be taken out"
        " as a parcelable when empty. Current queue capacity: %zu",
        capacity());
        "%s: producer queue can only be taken out as a parcelable when empty. "
        "Current queue capacity: %zu",
        __FUNCTION__, capacity());
    return ErrorStatus(EINVAL);
  }

@@ -628,17 +681,16 @@ ConsumerQueue::ConsumerQueue(LocalChannelHandle handle)
    : BufferHubQueue(std::move(handle)) {
  auto status = ImportQueue();
  if (!status) {
    ALOGE("ConsumerQueue::ConsumerQueue: Failed to import queue: %s",
    ALOGE("%s: Failed to import queue: %s", __FUNCTION__,
          status.GetErrorMessage().c_str());
    Close(-status.error());
  }

  auto import_status = ImportBuffers();
  if (import_status) {
    ALOGI("ConsumerQueue::ConsumerQueue: Imported %zu buffers.",
          import_status.get());
    ALOGI("%s: Imported %zu buffers.", __FUNCTION__, import_status.get());
  } else {
    ALOGE("ConsumerQueue::ConsumerQueue: Failed to import buffers: %s",
    ALOGE("%s: Failed to import buffers: %s", __FUNCTION__,
          import_status.GetErrorMessage().c_str());
  }
}
@@ -647,13 +699,10 @@ Status<size_t> ConsumerQueue::ImportBuffers() {
  auto status = InvokeRemoteMethod<BufferHubRPC::ConsumerQueueImportBuffers>();
  if (!status) {
    if (status.error() == EBADR) {
      ALOGI(
          "ConsumerQueue::ImportBuffers: Queue is silent, no buffers "
          "imported.");
      ALOGI("%s: Queue is silent, no buffers imported.", __FUNCTION__);
      return {0};
    } else {
      ALOGE(
          "ConsumerQueue::ImportBuffers: Failed to import consumer buffer: %s",
      ALOGE("%s: Failed to import consumer buffer: %s", __FUNCTION__,
            status.GetErrorMessage().c_str());
      return status.error_status();
    }
@@ -665,13 +714,13 @@ Status<size_t> ConsumerQueue::ImportBuffers() {

  auto buffer_handle_slots = status.take();
  for (auto& buffer_handle_slot : buffer_handle_slots) {
    ALOGD_IF(TRACE, "ConsumerQueue::ImportBuffers: buffer_handle=%d",
    ALOGD_IF(TRACE, ": buffer_handle=%d", __FUNCTION__,
             buffer_handle_slot.first.value());

    std::unique_ptr<BufferConsumer> buffer_consumer =
        BufferConsumer::Import(std::move(buffer_handle_slot.first));
    if (!buffer_consumer) {
      ALOGE("ConsumerQueue::ImportBuffers: Failed to import buffer: slot=%zu",
      ALOGE("%s: Failed to import buffer: slot=%zu", __FUNCTION__,
            buffer_handle_slot.second);
      last_error = ErrorStatus(EPIPE);
      continue;
@@ -680,7 +729,7 @@ Status<size_t> ConsumerQueue::ImportBuffers() {
    auto add_status =
        AddBuffer(std::move(buffer_consumer), buffer_handle_slot.second);
    if (!add_status) {
      ALOGE("ConsumerQueue::ImportBuffers: Failed to add buffer: %s",
      ALOGE("%s: Failed to add buffer: %s", __FUNCTION__,
            add_status.GetErrorMessage().c_str());
      last_error = add_status;
    } else {
@@ -696,8 +745,8 @@ Status<size_t> ConsumerQueue::ImportBuffers() {

Status<void> ConsumerQueue::AddBuffer(
    const std::shared_ptr<BufferConsumer>& buffer, size_t slot) {
  ALOGD_IF(TRACE, "ConsumerQueue::AddBuffer: queue_id=%d buffer_id=%d slot=%zu",
           id(), buffer->id(), slot);
  ALOGD_IF(TRACE, "%s: queue_id=%d buffer_id=%d slot=%zu", __FUNCTION__, id(),
           buffer->id(), slot);
  return BufferHubQueue::AddBuffer(buffer, slot);
}

@@ -706,9 +755,9 @@ Status<std::shared_ptr<BufferConsumer>> ConsumerQueue::Dequeue(
    LocalHandle* acquire_fence) {
  if (user_metadata_size != user_metadata_size_) {
    ALOGE(
        "ConsumerQueue::Dequeue: Metadata size (%zu) for the dequeuing buffer "
        "does not match metadata size (%zu) for the queue.",
        user_metadata_size, user_metadata_size_);
        "%s: Metadata size (%zu) for the dequeuing buffer does not match "
        "metadata size (%zu) for the queue.",
        __FUNCTION__, user_metadata_size, user_metadata_size_);
    return ErrorStatus(EINVAL);
  }

@@ -723,7 +772,7 @@ Status<std::shared_ptr<BufferConsumer>> ConsumerQueue::Dequeue(
    if (metadata_src) {
      memcpy(meta, metadata_src, user_metadata_size);
    } else {
      ALOGW("ConsumerQueue::Dequeue: no user-defined metadata.");
      ALOGW("%s: no user-defined metadata.", __FUNCTION__);
    }
  }

@@ -735,7 +784,7 @@ Status<std::shared_ptr<BufferConsumer>> ConsumerQueue::Dequeue(
    pdx::LocalHandle* acquire_fence) {
  ATRACE_NAME("ConsumerQueue::Dequeue");
  if (slot == nullptr || out_meta == nullptr || acquire_fence == nullptr) {
    ALOGE("ConsumerQueue::Dequeue: Invalid parameter.");
    ALOGE("%s: Invalid parameter.", __FUNCTION__);
    return ErrorStatus(EINVAL);
  }

@@ -752,19 +801,18 @@ Status<std::shared_ptr<BufferConsumer>> ConsumerQueue::Dequeue(
}

Status<void> ConsumerQueue::OnBufferAllocated() {
  ALOGD_IF(TRACE, "ConsumerQueue::OnBufferAllocated: queue_id=%d", id());
  ALOGD_IF(TRACE, "%s: queue_id=%d", __FUNCTION__, id());

  auto status = ImportBuffers();
  if (!status) {
    ALOGE("ConsumerQueue::OnBufferAllocated: Failed to import buffers: %s",
    ALOGE("%s: Failed to import buffers: %s", __FUNCTION__,
          status.GetErrorMessage().c_str());
    return ErrorStatus(status.error());
  } else if (status.get() == 0) {
    ALOGW("ConsumerQueue::OnBufferAllocated: No new buffers allocated!");
    ALOGW("%s: No new buffers allocated!", __FUNCTION__);
    return ErrorStatus(ENOBUFS);
  } else {
    ALOGD_IF(TRACE,
             "ConsumerQueue::OnBufferAllocated: Imported %zu consumer buffers.",
    ALOGD_IF(TRACE, "%s: Imported %zu consumer buffers.", __FUNCTION__,
             status.get());
    return {};
  }
+40 −7
Original line number Diff line number Diff line
@@ -57,10 +57,10 @@ class BufferHubQueue : public pdx::Client {
  uint32_t default_width() const { return default_width_; }

  // Returns the default buffer height of this buffer queue.
  uint32_t default_height() const { return static_cast<uint32_t>(default_height_); }
  uint32_t default_height() const { return default_height_; }

  // Returns the default buffer format of this buffer queue.
  uint32_t default_format() const { return static_cast<uint32_t>(default_format_); }
  uint32_t default_format() const { return default_format_; }

  // Creates a new consumer in handle form for immediate transport over RPC.
  pdx::Status<pdx::LocalChannelHandle> CreateConsumerQueueHandle(
@@ -208,6 +208,14 @@ class BufferHubQueue : public pdx::Client {
  // Size of the metadata that buffers in this queue cary.
  size_t user_metadata_size_{0};

  // Buffers and related data that are available for dequeue.
  std::priority_queue<Entry, std::vector<Entry>, EntryComparator>
      available_buffers_;

  // Slot of the buffers that are not available for normal dequeue. For example,
  // the slot of posted or acquired buffers in the perspective of a producer.
  std::vector<size_t> unavailable_buffers_slot_;

 private:
  void Initialize();

@@ -252,10 +260,6 @@ class BufferHubQueue : public pdx::Client {
  // queue regardless of its queue position or presence in the ring buffer.
  std::array<std::shared_ptr<BufferHubBase>, kMaxQueueCapacity> buffers_;

  // Buffers and related data that are available for dequeue.
  std::priority_queue<Entry, std::vector<Entry>, EntryComparator>
      available_buffers_;

  // Keeps track with how many buffers have been added into the queue.
  size_t capacity_{0};

@@ -349,11 +353,30 @@ class ProducerQueue : public pdx::ClientBase<ProducerQueue, BufferHubQueue> {
  // Dequeue a producer buffer to write. The returned buffer in |Gain|'ed mode,
  // and caller should call Post() once it's done writing to release the buffer
  // to the consumer side.
  // @return a buffer in gained state, which was originally in released state.
  pdx::Status<std::shared_ptr<BufferProducer>> Dequeue(
      int timeout, size_t* slot, pdx::LocalHandle* release_fence);

  // Dequeue a producer buffer to write. The returned buffer in |Gain|'ed mode,
  // and caller should call Post() once it's done writing to release the buffer
  // to the consumer side.
  //
  // @param timeout to dequeue a buffer.
  // @param slot is the slot of the output BufferProducer.
  // @param release_fence for gaining a buffer.
  // @param out_meta metadata of the output buffer.
  // @param gain_posted_buffer whether to gain posted buffer if no released
  //     buffer is available to gain.
  // @return a buffer in gained state, which was originally in released state if
  //     gain_posted_buffer is false, or in posted/released state if
  //     gain_posted_buffer is true.
  // TODO(b/112007999): gain_posted_buffer true is only used to prevent
  // libdvrtracking from starving when there are non-responding clients. This
  // gain_posted_buffer param can be removed once libdvrtracking start to use
  // the new AHardwareBuffer API.
  pdx::Status<std::shared_ptr<BufferProducer>> Dequeue(
      int timeout, size_t* slot, DvrNativeBufferMetadata* out_meta,
      pdx::LocalHandle* release_fence);
      pdx::LocalHandle* release_fence, bool gain_posted_buffer = false);

  // Enqueues a producer buffer in the queue.
  pdx::Status<void> Enqueue(const std::shared_ptr<BufferProducer>& buffer,
@@ -374,6 +397,16 @@ class ProducerQueue : public pdx::ClientBase<ProducerQueue, BufferHubQueue> {
  // arguments as the constructors.
  explicit ProducerQueue(pdx::LocalChannelHandle handle);
  ProducerQueue(const ProducerQueueConfig& config, const UsagePolicy& usage);

  // Dequeue a producer buffer to write. The returned buffer in |Gain|'ed mode,
  // and caller should call Post() once it's done writing to release the buffer
  // to the consumer side.
  //
  // @param slot the slot of the returned buffer.
  // @return a buffer in gained state, which was originally in posted state or
  //     released state.
  pdx::Status<std::shared_ptr<BufferProducer>> DequeueUnacquiredBuffer(
      size_t* slot);
};

class ConsumerQueue : public BufferHubQueue {
+144 −2
Original line number Diff line number Diff line
#include <base/logging.h>
#include <binder/Parcel.h>
#include <dvr/dvr_api.h>
#include <private/dvr/buffer_hub_client.h>
#include <private/dvr/buffer_hub_queue_client.h>

@@ -122,6 +123,147 @@ TEST_F(BufferHubQueueTest, TestDequeue) {
  }
}

TEST_F(BufferHubQueueTest,
       TestDequeuePostedBufferIfNoAvailableReleasedBuffer_withBufferConsumer) {
  ASSERT_TRUE(CreateQueues(config_builder_.Build(), UsagePolicy{}));

  // Allocate 3 buffers to use.
  const size_t test_queue_capacity = 3;
  for (int64_t i = 0; i < test_queue_capacity; i++) {
    AllocateBuffer();
  }
  EXPECT_EQ(producer_queue_->capacity(), test_queue_capacity);

  size_t producer_slot, consumer_slot;
  LocalHandle fence;
  DvrNativeBufferMetadata mi, mo;

  // Producer posts 2 buffers and remember their posted sequence.
  std::deque<size_t> posted_slots;
  for (int64_t i = 0; i < 2; i++) {
    auto p1_status =
        producer_queue_->Dequeue(kTimeoutMs, &producer_slot, &mo, &fence, true);
    EXPECT_TRUE(p1_status.ok());
    auto p1 = p1_status.take();
    ASSERT_NE(p1, nullptr);

    // Producer should not be gaining posted buffer when there are still
    // available buffers to gain.
    auto found_iter =
        std::find(posted_slots.begin(), posted_slots.end(), producer_slot);
    EXPECT_EQ(found_iter, posted_slots.end());
    posted_slots.push_back(producer_slot);

    // Producer posts the buffer.
    mi.index = i;
    EXPECT_EQ(0, p1->PostAsync(&mi, LocalHandle()));
  }

  // Consumer acquires one buffer.
  auto c1_status =
      consumer_queue_->Dequeue(kTimeoutMs, &consumer_slot, &mo, &fence);
  EXPECT_TRUE(c1_status.ok());
  auto c1 = c1_status.take();
  ASSERT_NE(c1, nullptr);
  // Consumer should get the oldest posted buffer. No checks here.
  // posted_slots[0] should be in acquired state now.
  EXPECT_EQ(mo.index, 0);
  // Consumer releases the buffer.
  EXPECT_EQ(c1->ReleaseAsync(&mi, LocalHandle()), 0);
  // posted_slots[0] should be in released state now.

  // Producer gain and post 2 buffers.
  for (int64_t i = 0; i < 2; i++) {
    auto p1_status =
        producer_queue_->Dequeue(kTimeoutMs, &producer_slot, &mo, &fence, true);
    EXPECT_TRUE(p1_status.ok());
    auto p1 = p1_status.take();
    ASSERT_NE(p1, nullptr);

    // The gained buffer should be the one in released state or the one haven't
    // been use.
    EXPECT_NE(posted_slots[1], producer_slot);

    mi.index = i + 2;
    EXPECT_EQ(0, p1->PostAsync(&mi, LocalHandle()));
  }

  // Producer gains a buffer.
  auto p1_status =
      producer_queue_->Dequeue(kTimeoutMs, &producer_slot, &mo, &fence, true);
  EXPECT_TRUE(p1_status.ok());
  auto p1 = p1_status.take();
  ASSERT_NE(p1, nullptr);

  // The gained buffer should be the oldest posted buffer.
  EXPECT_EQ(posted_slots[1], producer_slot);

  // Producer posts the buffer.
  mi.index = 4;
  EXPECT_EQ(0, p1->PostAsync(&mi, LocalHandle()));
}

TEST_F(BufferHubQueueTest,
       TestDequeuePostedBufferIfNoAvailableReleasedBuffer_noBufferConsumer) {
  ASSERT_TRUE(CreateQueues(config_builder_.Build(), UsagePolicy{}));

  // Allocate 4 buffers to use.
  const size_t test_queue_capacity = 4;
  for (int64_t i = 0; i < test_queue_capacity; i++) {
    AllocateBuffer();
  }
  EXPECT_EQ(producer_queue_->capacity(), test_queue_capacity);

  // Post all allowed buffers and remember their posted sequence.
  std::deque<size_t> posted_slots;
  for (int64_t i = 0; i < test_queue_capacity; i++) {
    size_t slot;
    LocalHandle fence;
    DvrNativeBufferMetadata mi, mo;

    // Producer gains a buffer.
    auto p1_status =
        producer_queue_->Dequeue(kTimeoutMs, &slot, &mo, &fence, true);
    EXPECT_TRUE(p1_status.ok());
    auto p1 = p1_status.take();
    ASSERT_NE(p1, nullptr);

    // Producer should not be gaining posted buffer when there are still
    // available buffers to gain.
    auto found_iter = std::find(posted_slots.begin(), posted_slots.end(), slot);
    EXPECT_EQ(found_iter, posted_slots.end());
    posted_slots.push_back(slot);

    // Producer posts the buffer.
    mi.index = i;
    EXPECT_EQ(p1->PostAsync(&mi, LocalHandle()), 0);
  }

  // Gain posted buffers in sequence.
  const int64_t nb_dequeue_all_times = 2;
  for (int j = 0; j < nb_dequeue_all_times; ++j) {
    for (int i = 0; i < test_queue_capacity; ++i) {
      size_t slot;
      LocalHandle fence;
      DvrNativeBufferMetadata mi, mo;

      // Producer gains a buffer.
      auto p1_status =
          producer_queue_->Dequeue(kTimeoutMs, &slot, &mo, &fence, true);
      EXPECT_TRUE(p1_status.ok());
      auto p1 = p1_status.take();
      ASSERT_NE(p1, nullptr);

      // The gained buffer should be the oldest posted buffer.
      EXPECT_EQ(posted_slots[i], slot);

      // Producer posts the buffer.
      mi.index = i + test_queue_capacity * (j + 1);
      EXPECT_EQ(p1->PostAsync(&mi, LocalHandle()), 0);
    }
  }
}

TEST_F(BufferHubQueueTest, TestProducerConsumer) {
  const size_t kBufferCount = 16;
  size_t slot;
@@ -245,8 +387,8 @@ TEST_F(BufferHubQueueTest, TestRemoveBuffer) {

  for (size_t i = 0; i < kBufferCount; i++) {
    Entry* entry = &buffers[i];
    auto producer_status = producer_queue_->Dequeue(
        kTimeoutMs, &entry->slot, &mo, &entry->fence);
    auto producer_status =
        producer_queue_->Dequeue(kTimeoutMs, &entry->slot, &mo, &entry->fence);
    ASSERT_TRUE(producer_status.ok());
    entry->buffer = producer_status.take();
    ASSERT_NE(nullptr, entry->buffer);