Donate to e Foundation | Murena handsets with /e/OS | Own a part of Murena! Learn more

Commit ed654322 authored by Jiwen 'Steve' Cai's avatar Jiwen 'Steve' Cai
Browse files

Expose acquire_fence though ConsumerQueue::Dequeue

The current compositor implementation will be refactored to use
BufferHubQueue instead of ad-hoc BufferHub operations. We need this to
expose release_fence to compositor so that it can wait for buffers
to become avaiable by checking fence properly.

Bug: 36033302
Bug: 36148608
Test: Built and ran buffer_hub_queue-test
Change-Id: I75cfcb02e06a4b9e7e89b89690ca2d92ee09a678
parent afbdf1f1
Loading
Loading
Loading
Loading
+32 −17
Original line number Original line Diff line number Diff line
@@ -13,9 +13,6 @@
#include <pdx/file_handle.h>
#include <pdx/file_handle.h>
#include <private/dvr/bufferhub_rpc.h>
#include <private/dvr/bufferhub_rpc.h>


using android::pdx::LocalHandle;
using android::pdx::LocalChannelHandle;

namespace android {
namespace android {
namespace dvr {
namespace dvr {


@@ -28,6 +25,7 @@ BufferHubQueue::BufferHubQueue(LocalChannelHandle channel_handle,
      buffers_(BufferHubQueue::kMaxQueueCapacity),
      buffers_(BufferHubQueue::kMaxQueueCapacity),
      epollhup_pending_(BufferHubQueue::kMaxQueueCapacity, false),
      epollhup_pending_(BufferHubQueue::kMaxQueueCapacity, false),
      available_buffers_(BufferHubQueue::kMaxQueueCapacity),
      available_buffers_(BufferHubQueue::kMaxQueueCapacity),
      fences_(BufferHubQueue::kMaxQueueCapacity),
      capacity_(0) {
      capacity_(0) {
  Initialize();
  Initialize();
}
}
@@ -41,6 +39,7 @@ BufferHubQueue::BufferHubQueue(const std::string& endpoint_path,
      buffers_(BufferHubQueue::kMaxQueueCapacity),
      buffers_(BufferHubQueue::kMaxQueueCapacity),
      epollhup_pending_(BufferHubQueue::kMaxQueueCapacity, false),
      epollhup_pending_(BufferHubQueue::kMaxQueueCapacity, false),
      available_buffers_(BufferHubQueue::kMaxQueueCapacity),
      available_buffers_(BufferHubQueue::kMaxQueueCapacity),
      fences_(BufferHubQueue::kMaxQueueCapacity),
      capacity_(0) {
      capacity_(0) {
  Initialize();
  Initialize();
}
}
@@ -134,7 +133,7 @@ void BufferHubQueue::HandleBufferEvent(size_t slot, const epoll_event& event) {


  int events = status.get();
  int events = status.get();
  if (events & EPOLLIN) {
  if (events & EPOLLIN) {
    int ret = OnBufferReady(buffer);
    int ret = OnBufferReady(buffer, &fences_[slot]);
    if (ret < 0) {
    if (ret < 0) {
      ALOGE("Failed to set buffer ready: %s", strerror(-ret));
      ALOGE("Failed to set buffer ready: %s", strerror(-ret));
      return;
      return;
@@ -254,7 +253,8 @@ void BufferHubQueue::Enqueue(std::shared_ptr<BufferHubBuffer> buf,


std::shared_ptr<BufferHubBuffer> BufferHubQueue::Dequeue(int timeout,
std::shared_ptr<BufferHubBuffer> BufferHubQueue::Dequeue(int timeout,
                                                         size_t* slot,
                                                         size_t* slot,
                                                         void* meta) {
                                                         void* meta,
                                                         LocalHandle* fence) {
  ALOGD("Dequeue: count=%zu, timeout=%d", count(), timeout);
  ALOGD("Dequeue: count=%zu, timeout=%d", count(), timeout);


  if (count() == 0 && !WaitForBuffers(timeout))
  if (count() == 0 && !WaitForBuffers(timeout))
@@ -263,6 +263,8 @@ std::shared_ptr<BufferHubBuffer> BufferHubQueue::Dequeue(int timeout,
  std::shared_ptr<BufferHubBuffer> buf;
  std::shared_ptr<BufferHubBuffer> buf;
  BufferInfo& buffer_info = available_buffers_.Front();
  BufferInfo& buffer_info = available_buffers_.Front();


  *fence = std::move(fences_[buffer_info.slot]);

  // Report current pos as the output slot.
  // Report current pos as the output slot.
  std::swap(buffer_info.slot, *slot);
  std::swap(buffer_info.slot, *slot);
  // Swap buffer from vector to be returned later.
  // Swap buffer from vector to be returned later.
@@ -373,15 +375,21 @@ int ProducerQueue::DetachBuffer(size_t slot) {
  return BufferHubQueue::DetachBuffer(slot);
  return BufferHubQueue::DetachBuffer(slot);
}
}


std::shared_ptr<BufferProducer> ProducerQueue::Dequeue(int timeout,
std::shared_ptr<BufferProducer> ProducerQueue::Dequeue(
                                                       size_t* slot) {
    int timeout, size_t* slot, LocalHandle* release_fence) {
  auto buf = BufferHubQueue::Dequeue(timeout, slot, nullptr);
  if (slot == nullptr || release_fence == nullptr) {
    ALOGE("invalid parameter, slot=%p, release_fence=%p", slot, release_fence);
    return nullptr;
  }

  auto buf = BufferHubQueue::Dequeue(timeout, slot, nullptr, release_fence);
  return std::static_pointer_cast<BufferProducer>(buf);
  return std::static_pointer_cast<BufferProducer>(buf);
}
}


int ProducerQueue::OnBufferReady(std::shared_ptr<BufferHubBuffer> buf) {
int ProducerQueue::OnBufferReady(std::shared_ptr<BufferHubBuffer> buf,
                                 LocalHandle* release_fence) {
  auto buffer = std::static_pointer_cast<BufferProducer>(buf);
  auto buffer = std::static_pointer_cast<BufferProducer>(buf);
  return buffer->GainAsync();
  return buffer->Gain(release_fence);
}
}


ConsumerQueue::ConsumerQueue(LocalChannelHandle handle, size_t meta_size)
ConsumerQueue::ConsumerQueue(LocalChannelHandle handle, size_t meta_size)
@@ -431,9 +439,9 @@ int ConsumerQueue::AddBuffer(const std::shared_ptr<BufferConsumer>& buf,
  return BufferHubQueue::AddBuffer(buf, slot);
  return BufferHubQueue::AddBuffer(buf, slot);
}
}


std::shared_ptr<BufferConsumer> ConsumerQueue::Dequeue(int timeout,
std::shared_ptr<BufferConsumer> ConsumerQueue::Dequeue(
                                                       size_t* slot, void* meta,
    int timeout, size_t* slot, void* meta, size_t meta_size,
                                                       size_t meta_size) {
    LocalHandle* acquire_fence) {
  if (meta_size != meta_size_) {
  if (meta_size != meta_size_) {
    ALOGE(
    ALOGE(
        "metadata size (%zu) for the dequeuing buffer does not match metadata "
        "metadata size (%zu) for the dequeuing buffer does not match metadata "
@@ -441,14 +449,21 @@ std::shared_ptr<BufferConsumer> ConsumerQueue::Dequeue(int timeout,
        meta_size, meta_size_);
        meta_size, meta_size_);
    return nullptr;
    return nullptr;
  }
  }
  auto buf = BufferHubQueue::Dequeue(timeout, slot, meta);

  if (slot == nullptr || meta == nullptr || acquire_fence == nullptr) {
    ALOGE("invalid parameter, slot=%p, meta=%p, acquire_fence=%p", slot, meta,
          acquire_fence);
    return nullptr;
  }

  auto buf = BufferHubQueue::Dequeue(timeout, slot, meta, acquire_fence);
  return std::static_pointer_cast<BufferConsumer>(buf);
  return std::static_pointer_cast<BufferConsumer>(buf);
}
}


int ConsumerQueue::OnBufferReady(std::shared_ptr<BufferHubBuffer> buf) {
int ConsumerQueue::OnBufferReady(std::shared_ptr<BufferHubBuffer> buf,
                                 LocalHandle* acquire_fence) {
  auto buffer = std::static_pointer_cast<BufferConsumer>(buf);
  auto buffer = std::static_pointer_cast<BufferConsumer>(buf);
  LocalHandle fence;
  return buffer->Acquire(acquire_fence, meta_buffer_tmp_.get(), meta_size_);
  return buffer->Acquire(&fence, meta_buffer_tmp_.get(), meta_size_);
}
}


int ConsumerQueue::OnBufferAllocated() {
int ConsumerQueue::OnBufferAllocated() {
+2 −1
Original line number Original line Diff line number Diff line
@@ -83,8 +83,9 @@ status_t BufferHubQueueProducer::dequeueBuffer(int* out_slot,
  std::shared_ptr<BufferProducer> buffer_producer;
  std::shared_ptr<BufferProducer> buffer_producer;


  for (size_t retry = 0; retry < BufferHubQueue::kMaxQueueCapacity; retry++) {
  for (size_t retry = 0; retry < BufferHubQueue::kMaxQueueCapacity; retry++) {
    LocalHandle fence;
    buffer_producer =
    buffer_producer =
        core_->producer_->Dequeue(core_->dequeue_timeout_ms_, &slot);
        core_->producer_->Dequeue(core_->dequeue_timeout_ms_, &slot, &fence);
    if (!buffer_producer)
    if (!buffer_producer)
      return NO_MEMORY;
      return NO_MEMORY;


+21 −11
Original line number Original line Diff line number Diff line
@@ -20,6 +20,7 @@ class ConsumerQueue;
// automatically re-requeued when released by the remote side.
// automatically re-requeued when released by the remote side.
class BufferHubQueue : public pdx::Client {
class BufferHubQueue : public pdx::Client {
 public:
 public:
  using LocalHandle = pdx::LocalHandle;
  using LocalChannelHandle = pdx::LocalChannelHandle;
  using LocalChannelHandle = pdx::LocalChannelHandle;
  template <typename T>
  template <typename T>
  using Status = pdx::Status<T>;
  using Status = pdx::Status<T>;
@@ -91,14 +92,15 @@ class BufferHubQueue : public pdx::Client {
  // while specifying a timeout equal to zero cause |Dequeue()| to return
  // while specifying a timeout equal to zero cause |Dequeue()| to return
  // immediately, even if no buffers are available.
  // immediately, even if no buffers are available.
  std::shared_ptr<BufferHubBuffer> Dequeue(int timeout, size_t* slot,
  std::shared_ptr<BufferHubBuffer> Dequeue(int timeout, size_t* slot,
                                           void* meta);
                                           void* meta, LocalHandle* fence);


  // Wait for buffers to be released and re-add them to the queue.
  // Wait for buffers to be released and re-add them to the queue.
  bool WaitForBuffers(int timeout);
  bool WaitForBuffers(int timeout);
  void HandleBufferEvent(size_t slot, const epoll_event& event);
  void HandleBufferEvent(size_t slot, const epoll_event& event);
  void HandleQueueEvent(const epoll_event& event);
  void HandleQueueEvent(const epoll_event& event);


  virtual int OnBufferReady(std::shared_ptr<BufferHubBuffer> buf) = 0;
  virtual int OnBufferReady(std::shared_ptr<BufferHubBuffer> buf,
                            LocalHandle* fence) = 0;


  // Called when a buffer is allocated remotely.
  // Called when a buffer is allocated remotely.
  virtual int OnBufferAllocated() = 0;
  virtual int OnBufferAllocated() = 0;
@@ -202,6 +204,10 @@ class BufferHubQueue : public pdx::Client {
  // prevent the buffer from being deleted.
  // prevent the buffer from being deleted.
  RingBuffer<BufferInfo> available_buffers_;
  RingBuffer<BufferInfo> available_buffers_;


  // Fences (acquire fence for consumer and release fence for consumer) , one
  // for each buffer slot.
  std::vector<LocalHandle> fences_;

  // Keep track with how many buffers have been added into the queue.
  // Keep track with how many buffers have been added into the queue.
  size_t capacity_;
  size_t capacity_;


@@ -274,7 +280,8 @@ class ProducerQueue : public pdx::ClientBase<ProducerQueue, BufferHubQueue> {
  // Dequeue a producer buffer to write. The returned buffer in |Gain|'ed mode,
  // Dequeue a producer buffer to write. The returned buffer in |Gain|'ed mode,
  // and caller should call Post() once it's done writing to release the buffer
  // and caller should call Post() once it's done writing to release the buffer
  // to the consumer side.
  // to the consumer side.
  std::shared_ptr<BufferProducer> Dequeue(int timeout, size_t* slot);
  std::shared_ptr<BufferProducer> Dequeue(int timeout, size_t* slot,
                                          LocalHandle* release_fence);


 private:
 private:
  friend BASE;
  friend BASE;
@@ -287,7 +294,8 @@ class ProducerQueue : public pdx::ClientBase<ProducerQueue, BufferHubQueue> {
  ProducerQueue(size_t meta_size, int usage_set_mask, int usage_clear_mask,
  ProducerQueue(size_t meta_size, int usage_set_mask, int usage_clear_mask,
                int usage_deny_set_mask, int usage_deny_clear_mask);
                int usage_deny_set_mask, int usage_deny_clear_mask);


  int OnBufferReady(std::shared_ptr<BufferHubBuffer> buf) override;
  int OnBufferReady(std::shared_ptr<BufferHubBuffer> buf,
                    LocalHandle* release_fence) override;


  // Producer buffer is always allocated from the client (i.e. local) side.
  // Producer buffer is always allocated from the client (i.e. local) side.
  int OnBufferAllocated() override { return 0; }
  int OnBufferAllocated() override { return 0; }
@@ -316,14 +324,11 @@ class ConsumerQueue : public pdx::ClientBase<ConsumerQueue, BufferHubQueue> {
  // Dequeue() is done with the corect metadata type and size with those used
  // Dequeue() is done with the corect metadata type and size with those used
  // when the buffer is orignally created.
  // when the buffer is orignally created.
  template <typename Meta>
  template <typename Meta>
  std::shared_ptr<BufferConsumer> Dequeue(int timeout, size_t* slot,
  std::shared_ptr<BufferConsumer> Dequeue(int timeout, size_t* slot, Meta* meta,
                                          Meta* meta) {
                                          LocalHandle* acquire_fence) {
    return Dequeue(timeout, slot, meta, sizeof(*meta));
    return Dequeue(timeout, slot, meta, sizeof(*meta), acquire_fence);
  }
  }


  std::shared_ptr<BufferConsumer> Dequeue(int timeout, size_t* slot, void* meta,
                                          size_t meta_size);

 private:
 private:
  friend BASE;
  friend BASE;


@@ -335,9 +340,14 @@ class ConsumerQueue : public pdx::ClientBase<ConsumerQueue, BufferHubQueue> {
  // consumer.
  // consumer.
  int AddBuffer(const std::shared_ptr<BufferConsumer>& buf, size_t slot);
  int AddBuffer(const std::shared_ptr<BufferConsumer>& buf, size_t slot);


  int OnBufferReady(std::shared_ptr<BufferHubBuffer> buf) override;
  int OnBufferReady(std::shared_ptr<BufferHubBuffer> buf,
                    LocalHandle* acquire_fence) override;


  int OnBufferAllocated() override;
  int OnBufferAllocated() override;

  std::shared_ptr<BufferConsumer> Dequeue(int timeout, size_t* slot, void* meta,
                                          size_t meta_size,
                                          LocalHandle* acquire_fence);
};
};


}  // namespace dvr
}  // namespace dvr
+28 −19
Original line number Original line Diff line number Diff line
@@ -59,12 +59,13 @@ TEST_F(BufferHubQueueTest, TestDequeue) {
  // But dequeue multiple times.
  // But dequeue multiple times.
  for (size_t i = 0; i < nb_dequeue_times; i++) {
  for (size_t i = 0; i < nb_dequeue_times; i++) {
    size_t slot;
    size_t slot;
    auto p1 = producer_queue_->Dequeue(0, &slot);
    LocalHandle fence;
    auto p1 = producer_queue_->Dequeue(0, &slot, &fence);
    ASSERT_NE(nullptr, p1);
    ASSERT_NE(nullptr, p1);
    size_t mi = i;
    size_t mi = i;
    ASSERT_EQ(p1->Post(LocalHandle(), &mi, sizeof(mi)), 0);
    ASSERT_EQ(p1->Post(LocalHandle(), &mi, sizeof(mi)), 0);
    size_t mo;
    size_t mo;
    auto c1 = consumer_queue_->Dequeue(100, &slot, &mo);
    auto c1 = consumer_queue_->Dequeue(100, &slot, &mo, &fence);
    ASSERT_NE(nullptr, c1);
    ASSERT_NE(nullptr, c1);
    ASSERT_EQ(mi, mo);
    ASSERT_EQ(mi, mo);
    c1->Release(LocalHandle());
    c1->Release(LocalHandle());
@@ -91,19 +92,21 @@ TEST_F(BufferHubQueueTest, TestProducerConsumer) {
    ASSERT_EQ(consumer_queue_->capacity(), i);
    ASSERT_EQ(consumer_queue_->capacity(), i);
    // Dequeue returns nullptr since no buffer is ready to consumer, but
    // Dequeue returns nullptr since no buffer is ready to consumer, but
    // this implicitly triggers buffer import and bump up |capacity|.
    // this implicitly triggers buffer import and bump up |capacity|.
    auto consumer_null = consumer_queue_->Dequeue(0, &slot, &seq);
    LocalHandle fence;
    auto consumer_null = consumer_queue_->Dequeue(0, &slot, &seq, &fence);
    ASSERT_EQ(nullptr, consumer_null);
    ASSERT_EQ(nullptr, consumer_null);
    ASSERT_EQ(consumer_queue_->capacity(), i + 1);
    ASSERT_EQ(consumer_queue_->capacity(), i + 1);
  }
  }


  for (size_t i = 0; i < nb_buffer; i++) {
  for (size_t i = 0; i < nb_buffer; i++) {
    LocalHandle fence;
    // First time, there is no buffer available to dequeue.
    // First time, there is no buffer available to dequeue.
    auto buffer_null = consumer_queue_->Dequeue(0, &slot, &seq);
    auto buffer_null = consumer_queue_->Dequeue(0, &slot, &seq, &fence);
    ASSERT_EQ(nullptr, buffer_null);
    ASSERT_EQ(nullptr, buffer_null);


    // Make sure Producer buffer is Post()'ed so that it's ready to Accquire
    // Make sure Producer buffer is Post()'ed so that it's ready to Accquire
    // in the consumer's Dequeue() function.
    // in the consumer's Dequeue() function.
    auto producer = producer_queue_->Dequeue(0, &slot);
    auto producer = producer_queue_->Dequeue(0, &slot, &fence);
    ASSERT_NE(nullptr, producer);
    ASSERT_NE(nullptr, producer);


    uint64_t seq_in = static_cast<uint64_t>(i);
    uint64_t seq_in = static_cast<uint64_t>(i);
@@ -111,7 +114,7 @@ TEST_F(BufferHubQueueTest, TestProducerConsumer) {


    // Second time, the just |Post()|'ed buffer should be dequeued.
    // Second time, the just |Post()|'ed buffer should be dequeued.
    uint64_t seq_out = 0;
    uint64_t seq_out = 0;
    auto consumer = consumer_queue_->Dequeue(0, &slot, &seq_out);
    auto consumer = consumer_queue_->Dequeue(0, &slot, &seq_out, &fence);
    ASSERT_NE(nullptr, consumer);
    ASSERT_NE(nullptr, consumer);
    ASSERT_EQ(seq_in, seq_out);
    ASSERT_EQ(seq_in, seq_out);
  }
  }
@@ -132,11 +135,12 @@ TEST_F(BufferHubQueueTest, TestMetadata) {


  for (auto mi : ms) {
  for (auto mi : ms) {
    size_t slot;
    size_t slot;
    auto p1 = producer_queue_->Dequeue(0, &slot);
    LocalHandle fence;
    auto p1 = producer_queue_->Dequeue(0, &slot, &fence);
    ASSERT_NE(nullptr, p1);
    ASSERT_NE(nullptr, p1);
    ASSERT_EQ(p1->Post(LocalHandle(-1), &mi, sizeof(mi)), 0);
    ASSERT_EQ(p1->Post(LocalHandle(-1), &mi, sizeof(mi)), 0);
    TestMetadata mo;
    TestMetadata mo;
    auto c1 = consumer_queue_->Dequeue(0, &slot, &mo);
    auto c1 = consumer_queue_->Dequeue(0, &slot, &mo, &fence);
    ASSERT_EQ(mi.a, mo.a);
    ASSERT_EQ(mi.a, mo.a);
    ASSERT_EQ(mi.b, mo.b);
    ASSERT_EQ(mi.b, mo.b);
    ASSERT_EQ(mi.c, mo.c);
    ASSERT_EQ(mi.c, mo.c);
@@ -150,13 +154,14 @@ TEST_F(BufferHubQueueTest, TestMetadataMismatch) {


  int64_t mi = 3;
  int64_t mi = 3;
  size_t slot;
  size_t slot;
  auto p1 = producer_queue_->Dequeue(0, &slot);
  LocalHandle fence;
  auto p1 = producer_queue_->Dequeue(0, &slot, &fence);
  ASSERT_NE(nullptr, p1);
  ASSERT_NE(nullptr, p1);
  ASSERT_EQ(p1->Post(LocalHandle(-1), &mi, sizeof(mi)), 0);
  ASSERT_EQ(p1->Post(LocalHandle(-1), &mi, sizeof(mi)), 0);


  int32_t mo;
  int32_t mo;
  // Acquire a buffer with mismatched metadata is not OK.
  // Acquire a buffer with mismatched metadata is not OK.
  auto c1 = consumer_queue_->Dequeue(0, &slot, &mo);
  auto c1 = consumer_queue_->Dequeue(0, &slot, &mo, &fence);
  ASSERT_EQ(nullptr, c1);
  ASSERT_EQ(nullptr, c1);
}
}


@@ -165,12 +170,13 @@ TEST_F(BufferHubQueueTest, TestEnqueue) {
  AllocateBuffer();
  AllocateBuffer();


  size_t slot;
  size_t slot;
  auto p1 = producer_queue_->Dequeue(0, &slot);
  LocalHandle fence;
  auto p1 = producer_queue_->Dequeue(0, &slot, &fence);
  ASSERT_NE(nullptr, p1);
  ASSERT_NE(nullptr, p1);


  int64_t mo;
  int64_t mo;
  producer_queue_->Enqueue(p1, slot);
  producer_queue_->Enqueue(p1, slot);
  auto c1 = consumer_queue_->Dequeue(0, &slot, &mo);
  auto c1 = consumer_queue_->Dequeue(0, &slot, &mo, &fence);
  ASSERT_EQ(nullptr, c1);
  ASSERT_EQ(nullptr, c1);
}
}


@@ -179,12 +185,13 @@ TEST_F(BufferHubQueueTest, TestAllocateBuffer) {


  size_t s1;
  size_t s1;
  AllocateBuffer();
  AllocateBuffer();
  auto p1 = producer_queue_->Dequeue(0, &s1);
  LocalHandle fence;
  auto p1 = producer_queue_->Dequeue(0, &s1, &fence);
  ASSERT_NE(nullptr, p1);
  ASSERT_NE(nullptr, p1);


  // producer queue is exhausted
  // producer queue is exhausted
  size_t s2;
  size_t s2;
  auto p2_null = producer_queue_->Dequeue(0, &s2);
  auto p2_null = producer_queue_->Dequeue(0, &s2, &fence);
  ASSERT_EQ(nullptr, p2_null);
  ASSERT_EQ(nullptr, p2_null);


  // dynamically add buffer.
  // dynamically add buffer.
@@ -193,7 +200,7 @@ TEST_F(BufferHubQueueTest, TestAllocateBuffer) {
  ASSERT_EQ(producer_queue_->capacity(), 2U);
  ASSERT_EQ(producer_queue_->capacity(), 2U);


  // now we can dequeue again
  // now we can dequeue again
  auto p2 = producer_queue_->Dequeue(0, &s2);
  auto p2 = producer_queue_->Dequeue(0, &s2, &fence);
  ASSERT_NE(nullptr, p2);
  ASSERT_NE(nullptr, p2);
  ASSERT_EQ(producer_queue_->count(), 0U);
  ASSERT_EQ(producer_queue_->count(), 0U);
  // p1 and p2 should have different slot number
  // p1 and p2 should have different slot number
@@ -206,14 +213,14 @@ TEST_F(BufferHubQueueTest, TestAllocateBuffer) {
  int64_t seq = 1;
  int64_t seq = 1;
  ASSERT_EQ(p1->Post(LocalHandle(), seq), 0);
  ASSERT_EQ(p1->Post(LocalHandle(), seq), 0);
  size_t cs1, cs2;
  size_t cs1, cs2;
  auto c1 = consumer_queue_->Dequeue(0, &cs1, &seq);
  auto c1 = consumer_queue_->Dequeue(0, &cs1, &seq, &fence);
  ASSERT_NE(nullptr, c1);
  ASSERT_NE(nullptr, c1);
  ASSERT_EQ(consumer_queue_->count(), 0U);
  ASSERT_EQ(consumer_queue_->count(), 0U);
  ASSERT_EQ(consumer_queue_->capacity(), 2U);
  ASSERT_EQ(consumer_queue_->capacity(), 2U);
  ASSERT_EQ(cs1, s1);
  ASSERT_EQ(cs1, s1);


  ASSERT_EQ(p2->Post(LocalHandle(), seq), 0);
  ASSERT_EQ(p2->Post(LocalHandle(), seq), 0);
  auto c2 = consumer_queue_->Dequeue(0, &cs2, &seq);
  auto c2 = consumer_queue_->Dequeue(0, &cs2, &seq, &fence);
  ASSERT_NE(nullptr, c2);
  ASSERT_NE(nullptr, c2);
  ASSERT_EQ(cs2, s2);
  ASSERT_EQ(cs2, s2);
}
}
@@ -229,7 +236,8 @@ TEST_F(BufferHubQueueTest, TestUsageSetMask) {
      kBufferSliceCount, &slot);
      kBufferSliceCount, &slot);
  ASSERT_EQ(ret, 0);
  ASSERT_EQ(ret, 0);


  auto p1 = producer_queue_->Dequeue(0, &slot);
  LocalHandle fence;
  auto p1 = producer_queue_->Dequeue(0, &slot, &fence);
  ASSERT_EQ(p1->usage() & set_mask, set_mask);
  ASSERT_EQ(p1->usage() & set_mask, set_mask);
}
}


@@ -244,7 +252,8 @@ TEST_F(BufferHubQueueTest, TestUsageClearMask) {
      kBufferSliceCount, &slot);
      kBufferSliceCount, &slot);
  ASSERT_EQ(ret, 0);
  ASSERT_EQ(ret, 0);


  auto p1 = producer_queue_->Dequeue(0, &slot);
  LocalHandle fence;
  auto p1 = producer_queue_->Dequeue(0, &slot, &fence);
  ASSERT_EQ(p1->usage() & clear_mask, 0);
  ASSERT_EQ(p1->usage() & clear_mask, 0);
}
}


+3 −1
Original line number Original line Diff line number Diff line
@@ -79,7 +79,9 @@ GLuint VideoCompositor::GetActiveTextureId(EGLDisplay display) {
    // queued in order from the producer side.
    // queued in order from the producer side.
    // TODO(jwcai) Use |metadata.timestamp_ns| to schedule video frames
    // TODO(jwcai) Use |metadata.timestamp_ns| to schedule video frames
    // accurately.
    // accurately.
    auto buffer_consumer = consumer_queue_->Dequeue(0, &slot, &metadata);
    LocalHandle acquire_fence;
    auto buffer_consumer =
        consumer_queue_->Dequeue(0, &slot, &metadata, &acquire_fence);


    if (buffer_consumer) {
    if (buffer_consumer) {
      // Create a new texture if it hasn't been created yet, or the same slot
      // Create a new texture if it hasn't been created yet, or the same slot