Loading libs/vr/libbufferhubqueue/buffer_hub_queue_client.cpp +32 −17 Original line number Diff line number Diff line Loading @@ -13,9 +13,6 @@ #include <pdx/file_handle.h> #include <private/dvr/bufferhub_rpc.h> using android::pdx::LocalHandle; using android::pdx::LocalChannelHandle; namespace android { namespace dvr { Loading @@ -28,6 +25,7 @@ BufferHubQueue::BufferHubQueue(LocalChannelHandle channel_handle, buffers_(BufferHubQueue::kMaxQueueCapacity), epollhup_pending_(BufferHubQueue::kMaxQueueCapacity, false), available_buffers_(BufferHubQueue::kMaxQueueCapacity), fences_(BufferHubQueue::kMaxQueueCapacity), capacity_(0) { Initialize(); } Loading @@ -41,6 +39,7 @@ BufferHubQueue::BufferHubQueue(const std::string& endpoint_path, buffers_(BufferHubQueue::kMaxQueueCapacity), epollhup_pending_(BufferHubQueue::kMaxQueueCapacity, false), available_buffers_(BufferHubQueue::kMaxQueueCapacity), fences_(BufferHubQueue::kMaxQueueCapacity), capacity_(0) { Initialize(); } Loading Loading @@ -134,7 +133,7 @@ void BufferHubQueue::HandleBufferEvent(size_t slot, const epoll_event& event) { int events = status.get(); if (events & EPOLLIN) { int ret = OnBufferReady(buffer); int ret = OnBufferReady(buffer, &fences_[slot]); if (ret < 0) { ALOGE("Failed to set buffer ready: %s", strerror(-ret)); return; Loading Loading @@ -254,7 +253,8 @@ void BufferHubQueue::Enqueue(std::shared_ptr<BufferHubBuffer> buf, std::shared_ptr<BufferHubBuffer> BufferHubQueue::Dequeue(int timeout, size_t* slot, void* meta) { void* meta, LocalHandle* fence) { ALOGD("Dequeue: count=%zu, timeout=%d", count(), timeout); if (count() == 0 && !WaitForBuffers(timeout)) Loading @@ -263,6 +263,8 @@ std::shared_ptr<BufferHubBuffer> BufferHubQueue::Dequeue(int timeout, std::shared_ptr<BufferHubBuffer> buf; BufferInfo& buffer_info = available_buffers_.Front(); *fence = std::move(fences_[buffer_info.slot]); // Report current pos as the output slot. std::swap(buffer_info.slot, *slot); // Swap buffer from vector to be returned later. Loading Loading @@ -373,15 +375,21 @@ int ProducerQueue::DetachBuffer(size_t slot) { return BufferHubQueue::DetachBuffer(slot); } std::shared_ptr<BufferProducer> ProducerQueue::Dequeue(int timeout, size_t* slot) { auto buf = BufferHubQueue::Dequeue(timeout, slot, nullptr); std::shared_ptr<BufferProducer> ProducerQueue::Dequeue( int timeout, size_t* slot, LocalHandle* release_fence) { if (slot == nullptr || release_fence == nullptr) { ALOGE("invalid parameter, slot=%p, release_fence=%p", slot, release_fence); return nullptr; } auto buf = BufferHubQueue::Dequeue(timeout, slot, nullptr, release_fence); return std::static_pointer_cast<BufferProducer>(buf); } int ProducerQueue::OnBufferReady(std::shared_ptr<BufferHubBuffer> buf) { int ProducerQueue::OnBufferReady(std::shared_ptr<BufferHubBuffer> buf, LocalHandle* release_fence) { auto buffer = std::static_pointer_cast<BufferProducer>(buf); return buffer->GainAsync(); return buffer->Gain(release_fence); } ConsumerQueue::ConsumerQueue(LocalChannelHandle handle, size_t meta_size) Loading Loading @@ -431,9 +439,9 @@ int ConsumerQueue::AddBuffer(const std::shared_ptr<BufferConsumer>& buf, return BufferHubQueue::AddBuffer(buf, slot); } std::shared_ptr<BufferConsumer> ConsumerQueue::Dequeue(int timeout, size_t* slot, void* meta, size_t meta_size) { std::shared_ptr<BufferConsumer> ConsumerQueue::Dequeue( int timeout, size_t* slot, void* meta, size_t meta_size, LocalHandle* acquire_fence) { if (meta_size != meta_size_) { ALOGE( "metadata size (%zu) for the dequeuing buffer does not match metadata " Loading @@ -441,14 +449,21 @@ std::shared_ptr<BufferConsumer> ConsumerQueue::Dequeue(int timeout, meta_size, meta_size_); return nullptr; } auto buf = BufferHubQueue::Dequeue(timeout, slot, meta); if (slot == nullptr || meta == nullptr || acquire_fence == nullptr) { ALOGE("invalid parameter, slot=%p, meta=%p, acquire_fence=%p", slot, meta, acquire_fence); return nullptr; } auto buf = BufferHubQueue::Dequeue(timeout, slot, meta, acquire_fence); return std::static_pointer_cast<BufferConsumer>(buf); } int ConsumerQueue::OnBufferReady(std::shared_ptr<BufferHubBuffer> buf) { int ConsumerQueue::OnBufferReady(std::shared_ptr<BufferHubBuffer> buf, LocalHandle* acquire_fence) { auto buffer = std::static_pointer_cast<BufferConsumer>(buf); LocalHandle fence; return buffer->Acquire(&fence, meta_buffer_tmp_.get(), meta_size_); return buffer->Acquire(acquire_fence, meta_buffer_tmp_.get(), meta_size_); } int ConsumerQueue::OnBufferAllocated() { Loading libs/vr/libbufferhubqueue/buffer_hub_queue_producer.cpp +2 −1 Original line number Diff line number Diff line Loading @@ -83,8 +83,9 @@ status_t BufferHubQueueProducer::dequeueBuffer(int* out_slot, std::shared_ptr<BufferProducer> buffer_producer; for (size_t retry = 0; retry < BufferHubQueue::kMaxQueueCapacity; retry++) { LocalHandle fence; buffer_producer = core_->producer_->Dequeue(core_->dequeue_timeout_ms_, &slot); core_->producer_->Dequeue(core_->dequeue_timeout_ms_, &slot, &fence); if (!buffer_producer) return NO_MEMORY; Loading libs/vr/libbufferhubqueue/include/private/dvr/buffer_hub_queue_client.h +21 −11 Original line number Diff line number Diff line Loading @@ -20,6 +20,7 @@ class ConsumerQueue; // automatically re-requeued when released by the remote side. class BufferHubQueue : public pdx::Client { public: using LocalHandle = pdx::LocalHandle; using LocalChannelHandle = pdx::LocalChannelHandle; template <typename T> using Status = pdx::Status<T>; Loading Loading @@ -91,14 +92,15 @@ class BufferHubQueue : public pdx::Client { // while specifying a timeout equal to zero cause |Dequeue()| to return // immediately, even if no buffers are available. std::shared_ptr<BufferHubBuffer> Dequeue(int timeout, size_t* slot, void* meta); void* meta, LocalHandle* fence); // Wait for buffers to be released and re-add them to the queue. bool WaitForBuffers(int timeout); void HandleBufferEvent(size_t slot, const epoll_event& event); void HandleQueueEvent(const epoll_event& event); virtual int OnBufferReady(std::shared_ptr<BufferHubBuffer> buf) = 0; virtual int OnBufferReady(std::shared_ptr<BufferHubBuffer> buf, LocalHandle* fence) = 0; // Called when a buffer is allocated remotely. virtual int OnBufferAllocated() = 0; Loading Loading @@ -202,6 +204,10 @@ class BufferHubQueue : public pdx::Client { // prevent the buffer from being deleted. RingBuffer<BufferInfo> available_buffers_; // Fences (acquire fence for consumer and release fence for consumer) , one // for each buffer slot. std::vector<LocalHandle> fences_; // Keep track with how many buffers have been added into the queue. size_t capacity_; Loading Loading @@ -274,7 +280,8 @@ class ProducerQueue : public pdx::ClientBase<ProducerQueue, BufferHubQueue> { // Dequeue a producer buffer to write. The returned buffer in |Gain|'ed mode, // and caller should call Post() once it's done writing to release the buffer // to the consumer side. std::shared_ptr<BufferProducer> Dequeue(int timeout, size_t* slot); std::shared_ptr<BufferProducer> Dequeue(int timeout, size_t* slot, LocalHandle* release_fence); private: friend BASE; Loading @@ -287,7 +294,8 @@ class ProducerQueue : public pdx::ClientBase<ProducerQueue, BufferHubQueue> { ProducerQueue(size_t meta_size, int usage_set_mask, int usage_clear_mask, int usage_deny_set_mask, int usage_deny_clear_mask); int OnBufferReady(std::shared_ptr<BufferHubBuffer> buf) override; int OnBufferReady(std::shared_ptr<BufferHubBuffer> buf, LocalHandle* release_fence) override; // Producer buffer is always allocated from the client (i.e. local) side. int OnBufferAllocated() override { return 0; } Loading Loading @@ -316,14 +324,11 @@ class ConsumerQueue : public pdx::ClientBase<ConsumerQueue, BufferHubQueue> { // Dequeue() is done with the corect metadata type and size with those used // when the buffer is orignally created. template <typename Meta> std::shared_ptr<BufferConsumer> Dequeue(int timeout, size_t* slot, Meta* meta) { return Dequeue(timeout, slot, meta, sizeof(*meta)); std::shared_ptr<BufferConsumer> Dequeue(int timeout, size_t* slot, Meta* meta, LocalHandle* acquire_fence) { return Dequeue(timeout, slot, meta, sizeof(*meta), acquire_fence); } std::shared_ptr<BufferConsumer> Dequeue(int timeout, size_t* slot, void* meta, size_t meta_size); private: friend BASE; Loading @@ -335,9 +340,14 @@ class ConsumerQueue : public pdx::ClientBase<ConsumerQueue, BufferHubQueue> { // consumer. int AddBuffer(const std::shared_ptr<BufferConsumer>& buf, size_t slot); int OnBufferReady(std::shared_ptr<BufferHubBuffer> buf) override; int OnBufferReady(std::shared_ptr<BufferHubBuffer> buf, LocalHandle* acquire_fence) override; int OnBufferAllocated() override; std::shared_ptr<BufferConsumer> Dequeue(int timeout, size_t* slot, void* meta, size_t meta_size, LocalHandle* acquire_fence); }; } // namespace dvr Loading libs/vr/libbufferhubqueue/tests/buffer_hub_queue-test.cpp +28 −19 Original line number Diff line number Diff line Loading @@ -59,12 +59,13 @@ TEST_F(BufferHubQueueTest, TestDequeue) { // But dequeue multiple times. for (size_t i = 0; i < nb_dequeue_times; i++) { size_t slot; auto p1 = producer_queue_->Dequeue(0, &slot); LocalHandle fence; auto p1 = producer_queue_->Dequeue(0, &slot, &fence); ASSERT_NE(nullptr, p1); size_t mi = i; ASSERT_EQ(p1->Post(LocalHandle(), &mi, sizeof(mi)), 0); size_t mo; auto c1 = consumer_queue_->Dequeue(100, &slot, &mo); auto c1 = consumer_queue_->Dequeue(100, &slot, &mo, &fence); ASSERT_NE(nullptr, c1); ASSERT_EQ(mi, mo); c1->Release(LocalHandle()); Loading @@ -91,19 +92,21 @@ TEST_F(BufferHubQueueTest, TestProducerConsumer) { ASSERT_EQ(consumer_queue_->capacity(), i); // Dequeue returns nullptr since no buffer is ready to consumer, but // this implicitly triggers buffer import and bump up |capacity|. auto consumer_null = consumer_queue_->Dequeue(0, &slot, &seq); LocalHandle fence; auto consumer_null = consumer_queue_->Dequeue(0, &slot, &seq, &fence); ASSERT_EQ(nullptr, consumer_null); ASSERT_EQ(consumer_queue_->capacity(), i + 1); } for (size_t i = 0; i < nb_buffer; i++) { LocalHandle fence; // First time, there is no buffer available to dequeue. auto buffer_null = consumer_queue_->Dequeue(0, &slot, &seq); auto buffer_null = consumer_queue_->Dequeue(0, &slot, &seq, &fence); ASSERT_EQ(nullptr, buffer_null); // Make sure Producer buffer is Post()'ed so that it's ready to Accquire // in the consumer's Dequeue() function. auto producer = producer_queue_->Dequeue(0, &slot); auto producer = producer_queue_->Dequeue(0, &slot, &fence); ASSERT_NE(nullptr, producer); uint64_t seq_in = static_cast<uint64_t>(i); Loading @@ -111,7 +114,7 @@ TEST_F(BufferHubQueueTest, TestProducerConsumer) { // Second time, the just |Post()|'ed buffer should be dequeued. uint64_t seq_out = 0; auto consumer = consumer_queue_->Dequeue(0, &slot, &seq_out); auto consumer = consumer_queue_->Dequeue(0, &slot, &seq_out, &fence); ASSERT_NE(nullptr, consumer); ASSERT_EQ(seq_in, seq_out); } Loading @@ -132,11 +135,12 @@ TEST_F(BufferHubQueueTest, TestMetadata) { for (auto mi : ms) { size_t slot; auto p1 = producer_queue_->Dequeue(0, &slot); LocalHandle fence; auto p1 = producer_queue_->Dequeue(0, &slot, &fence); ASSERT_NE(nullptr, p1); ASSERT_EQ(p1->Post(LocalHandle(-1), &mi, sizeof(mi)), 0); TestMetadata mo; auto c1 = consumer_queue_->Dequeue(0, &slot, &mo); auto c1 = consumer_queue_->Dequeue(0, &slot, &mo, &fence); ASSERT_EQ(mi.a, mo.a); ASSERT_EQ(mi.b, mo.b); ASSERT_EQ(mi.c, mo.c); Loading @@ -150,13 +154,14 @@ TEST_F(BufferHubQueueTest, TestMetadataMismatch) { int64_t mi = 3; size_t slot; auto p1 = producer_queue_->Dequeue(0, &slot); LocalHandle fence; auto p1 = producer_queue_->Dequeue(0, &slot, &fence); ASSERT_NE(nullptr, p1); ASSERT_EQ(p1->Post(LocalHandle(-1), &mi, sizeof(mi)), 0); int32_t mo; // Acquire a buffer with mismatched metadata is not OK. auto c1 = consumer_queue_->Dequeue(0, &slot, &mo); auto c1 = consumer_queue_->Dequeue(0, &slot, &mo, &fence); ASSERT_EQ(nullptr, c1); } Loading @@ -165,12 +170,13 @@ TEST_F(BufferHubQueueTest, TestEnqueue) { AllocateBuffer(); size_t slot; auto p1 = producer_queue_->Dequeue(0, &slot); LocalHandle fence; auto p1 = producer_queue_->Dequeue(0, &slot, &fence); ASSERT_NE(nullptr, p1); int64_t mo; producer_queue_->Enqueue(p1, slot); auto c1 = consumer_queue_->Dequeue(0, &slot, &mo); auto c1 = consumer_queue_->Dequeue(0, &slot, &mo, &fence); ASSERT_EQ(nullptr, c1); } Loading @@ -179,12 +185,13 @@ TEST_F(BufferHubQueueTest, TestAllocateBuffer) { size_t s1; AllocateBuffer(); auto p1 = producer_queue_->Dequeue(0, &s1); LocalHandle fence; auto p1 = producer_queue_->Dequeue(0, &s1, &fence); ASSERT_NE(nullptr, p1); // producer queue is exhausted size_t s2; auto p2_null = producer_queue_->Dequeue(0, &s2); auto p2_null = producer_queue_->Dequeue(0, &s2, &fence); ASSERT_EQ(nullptr, p2_null); // dynamically add buffer. Loading @@ -193,7 +200,7 @@ TEST_F(BufferHubQueueTest, TestAllocateBuffer) { ASSERT_EQ(producer_queue_->capacity(), 2U); // now we can dequeue again auto p2 = producer_queue_->Dequeue(0, &s2); auto p2 = producer_queue_->Dequeue(0, &s2, &fence); ASSERT_NE(nullptr, p2); ASSERT_EQ(producer_queue_->count(), 0U); // p1 and p2 should have different slot number Loading @@ -206,14 +213,14 @@ TEST_F(BufferHubQueueTest, TestAllocateBuffer) { int64_t seq = 1; ASSERT_EQ(p1->Post(LocalHandle(), seq), 0); size_t cs1, cs2; auto c1 = consumer_queue_->Dequeue(0, &cs1, &seq); auto c1 = consumer_queue_->Dequeue(0, &cs1, &seq, &fence); ASSERT_NE(nullptr, c1); ASSERT_EQ(consumer_queue_->count(), 0U); ASSERT_EQ(consumer_queue_->capacity(), 2U); ASSERT_EQ(cs1, s1); ASSERT_EQ(p2->Post(LocalHandle(), seq), 0); auto c2 = consumer_queue_->Dequeue(0, &cs2, &seq); auto c2 = consumer_queue_->Dequeue(0, &cs2, &seq, &fence); ASSERT_NE(nullptr, c2); ASSERT_EQ(cs2, s2); } Loading @@ -229,7 +236,8 @@ TEST_F(BufferHubQueueTest, TestUsageSetMask) { kBufferSliceCount, &slot); ASSERT_EQ(ret, 0); auto p1 = producer_queue_->Dequeue(0, &slot); LocalHandle fence; auto p1 = producer_queue_->Dequeue(0, &slot, &fence); ASSERT_EQ(p1->usage() & set_mask, set_mask); } Loading @@ -244,7 +252,8 @@ TEST_F(BufferHubQueueTest, TestUsageClearMask) { kBufferSliceCount, &slot); ASSERT_EQ(ret, 0); auto p1 = producer_queue_->Dequeue(0, &slot); LocalHandle fence; auto p1 = producer_queue_->Dequeue(0, &slot, &fence); ASSERT_EQ(p1->usage() & clear_mask, 0); } Loading libs/vr/libvrflinger/video_compositor.cpp +3 −1 Original line number Diff line number Diff line Loading @@ -79,7 +79,9 @@ GLuint VideoCompositor::GetActiveTextureId(EGLDisplay display) { // queued in order from the producer side. // TODO(jwcai) Use |metadata.timestamp_ns| to schedule video frames // accurately. auto buffer_consumer = consumer_queue_->Dequeue(0, &slot, &metadata); LocalHandle acquire_fence; auto buffer_consumer = consumer_queue_->Dequeue(0, &slot, &metadata, &acquire_fence); if (buffer_consumer) { // Create a new texture if it hasn't been created yet, or the same slot Loading Loading
libs/vr/libbufferhubqueue/buffer_hub_queue_client.cpp +32 −17 Original line number Diff line number Diff line Loading @@ -13,9 +13,6 @@ #include <pdx/file_handle.h> #include <private/dvr/bufferhub_rpc.h> using android::pdx::LocalHandle; using android::pdx::LocalChannelHandle; namespace android { namespace dvr { Loading @@ -28,6 +25,7 @@ BufferHubQueue::BufferHubQueue(LocalChannelHandle channel_handle, buffers_(BufferHubQueue::kMaxQueueCapacity), epollhup_pending_(BufferHubQueue::kMaxQueueCapacity, false), available_buffers_(BufferHubQueue::kMaxQueueCapacity), fences_(BufferHubQueue::kMaxQueueCapacity), capacity_(0) { Initialize(); } Loading @@ -41,6 +39,7 @@ BufferHubQueue::BufferHubQueue(const std::string& endpoint_path, buffers_(BufferHubQueue::kMaxQueueCapacity), epollhup_pending_(BufferHubQueue::kMaxQueueCapacity, false), available_buffers_(BufferHubQueue::kMaxQueueCapacity), fences_(BufferHubQueue::kMaxQueueCapacity), capacity_(0) { Initialize(); } Loading Loading @@ -134,7 +133,7 @@ void BufferHubQueue::HandleBufferEvent(size_t slot, const epoll_event& event) { int events = status.get(); if (events & EPOLLIN) { int ret = OnBufferReady(buffer); int ret = OnBufferReady(buffer, &fences_[slot]); if (ret < 0) { ALOGE("Failed to set buffer ready: %s", strerror(-ret)); return; Loading Loading @@ -254,7 +253,8 @@ void BufferHubQueue::Enqueue(std::shared_ptr<BufferHubBuffer> buf, std::shared_ptr<BufferHubBuffer> BufferHubQueue::Dequeue(int timeout, size_t* slot, void* meta) { void* meta, LocalHandle* fence) { ALOGD("Dequeue: count=%zu, timeout=%d", count(), timeout); if (count() == 0 && !WaitForBuffers(timeout)) Loading @@ -263,6 +263,8 @@ std::shared_ptr<BufferHubBuffer> BufferHubQueue::Dequeue(int timeout, std::shared_ptr<BufferHubBuffer> buf; BufferInfo& buffer_info = available_buffers_.Front(); *fence = std::move(fences_[buffer_info.slot]); // Report current pos as the output slot. std::swap(buffer_info.slot, *slot); // Swap buffer from vector to be returned later. Loading Loading @@ -373,15 +375,21 @@ int ProducerQueue::DetachBuffer(size_t slot) { return BufferHubQueue::DetachBuffer(slot); } std::shared_ptr<BufferProducer> ProducerQueue::Dequeue(int timeout, size_t* slot) { auto buf = BufferHubQueue::Dequeue(timeout, slot, nullptr); std::shared_ptr<BufferProducer> ProducerQueue::Dequeue( int timeout, size_t* slot, LocalHandle* release_fence) { if (slot == nullptr || release_fence == nullptr) { ALOGE("invalid parameter, slot=%p, release_fence=%p", slot, release_fence); return nullptr; } auto buf = BufferHubQueue::Dequeue(timeout, slot, nullptr, release_fence); return std::static_pointer_cast<BufferProducer>(buf); } int ProducerQueue::OnBufferReady(std::shared_ptr<BufferHubBuffer> buf) { int ProducerQueue::OnBufferReady(std::shared_ptr<BufferHubBuffer> buf, LocalHandle* release_fence) { auto buffer = std::static_pointer_cast<BufferProducer>(buf); return buffer->GainAsync(); return buffer->Gain(release_fence); } ConsumerQueue::ConsumerQueue(LocalChannelHandle handle, size_t meta_size) Loading Loading @@ -431,9 +439,9 @@ int ConsumerQueue::AddBuffer(const std::shared_ptr<BufferConsumer>& buf, return BufferHubQueue::AddBuffer(buf, slot); } std::shared_ptr<BufferConsumer> ConsumerQueue::Dequeue(int timeout, size_t* slot, void* meta, size_t meta_size) { std::shared_ptr<BufferConsumer> ConsumerQueue::Dequeue( int timeout, size_t* slot, void* meta, size_t meta_size, LocalHandle* acquire_fence) { if (meta_size != meta_size_) { ALOGE( "metadata size (%zu) for the dequeuing buffer does not match metadata " Loading @@ -441,14 +449,21 @@ std::shared_ptr<BufferConsumer> ConsumerQueue::Dequeue(int timeout, meta_size, meta_size_); return nullptr; } auto buf = BufferHubQueue::Dequeue(timeout, slot, meta); if (slot == nullptr || meta == nullptr || acquire_fence == nullptr) { ALOGE("invalid parameter, slot=%p, meta=%p, acquire_fence=%p", slot, meta, acquire_fence); return nullptr; } auto buf = BufferHubQueue::Dequeue(timeout, slot, meta, acquire_fence); return std::static_pointer_cast<BufferConsumer>(buf); } int ConsumerQueue::OnBufferReady(std::shared_ptr<BufferHubBuffer> buf) { int ConsumerQueue::OnBufferReady(std::shared_ptr<BufferHubBuffer> buf, LocalHandle* acquire_fence) { auto buffer = std::static_pointer_cast<BufferConsumer>(buf); LocalHandle fence; return buffer->Acquire(&fence, meta_buffer_tmp_.get(), meta_size_); return buffer->Acquire(acquire_fence, meta_buffer_tmp_.get(), meta_size_); } int ConsumerQueue::OnBufferAllocated() { Loading
libs/vr/libbufferhubqueue/buffer_hub_queue_producer.cpp +2 −1 Original line number Diff line number Diff line Loading @@ -83,8 +83,9 @@ status_t BufferHubQueueProducer::dequeueBuffer(int* out_slot, std::shared_ptr<BufferProducer> buffer_producer; for (size_t retry = 0; retry < BufferHubQueue::kMaxQueueCapacity; retry++) { LocalHandle fence; buffer_producer = core_->producer_->Dequeue(core_->dequeue_timeout_ms_, &slot); core_->producer_->Dequeue(core_->dequeue_timeout_ms_, &slot, &fence); if (!buffer_producer) return NO_MEMORY; Loading
libs/vr/libbufferhubqueue/include/private/dvr/buffer_hub_queue_client.h +21 −11 Original line number Diff line number Diff line Loading @@ -20,6 +20,7 @@ class ConsumerQueue; // automatically re-requeued when released by the remote side. class BufferHubQueue : public pdx::Client { public: using LocalHandle = pdx::LocalHandle; using LocalChannelHandle = pdx::LocalChannelHandle; template <typename T> using Status = pdx::Status<T>; Loading Loading @@ -91,14 +92,15 @@ class BufferHubQueue : public pdx::Client { // while specifying a timeout equal to zero cause |Dequeue()| to return // immediately, even if no buffers are available. std::shared_ptr<BufferHubBuffer> Dequeue(int timeout, size_t* slot, void* meta); void* meta, LocalHandle* fence); // Wait for buffers to be released and re-add them to the queue. bool WaitForBuffers(int timeout); void HandleBufferEvent(size_t slot, const epoll_event& event); void HandleQueueEvent(const epoll_event& event); virtual int OnBufferReady(std::shared_ptr<BufferHubBuffer> buf) = 0; virtual int OnBufferReady(std::shared_ptr<BufferHubBuffer> buf, LocalHandle* fence) = 0; // Called when a buffer is allocated remotely. virtual int OnBufferAllocated() = 0; Loading Loading @@ -202,6 +204,10 @@ class BufferHubQueue : public pdx::Client { // prevent the buffer from being deleted. RingBuffer<BufferInfo> available_buffers_; // Fences (acquire fence for consumer and release fence for consumer) , one // for each buffer slot. std::vector<LocalHandle> fences_; // Keep track with how many buffers have been added into the queue. size_t capacity_; Loading Loading @@ -274,7 +280,8 @@ class ProducerQueue : public pdx::ClientBase<ProducerQueue, BufferHubQueue> { // Dequeue a producer buffer to write. The returned buffer in |Gain|'ed mode, // and caller should call Post() once it's done writing to release the buffer // to the consumer side. std::shared_ptr<BufferProducer> Dequeue(int timeout, size_t* slot); std::shared_ptr<BufferProducer> Dequeue(int timeout, size_t* slot, LocalHandle* release_fence); private: friend BASE; Loading @@ -287,7 +294,8 @@ class ProducerQueue : public pdx::ClientBase<ProducerQueue, BufferHubQueue> { ProducerQueue(size_t meta_size, int usage_set_mask, int usage_clear_mask, int usage_deny_set_mask, int usage_deny_clear_mask); int OnBufferReady(std::shared_ptr<BufferHubBuffer> buf) override; int OnBufferReady(std::shared_ptr<BufferHubBuffer> buf, LocalHandle* release_fence) override; // Producer buffer is always allocated from the client (i.e. local) side. int OnBufferAllocated() override { return 0; } Loading Loading @@ -316,14 +324,11 @@ class ConsumerQueue : public pdx::ClientBase<ConsumerQueue, BufferHubQueue> { // Dequeue() is done with the corect metadata type and size with those used // when the buffer is orignally created. template <typename Meta> std::shared_ptr<BufferConsumer> Dequeue(int timeout, size_t* slot, Meta* meta) { return Dequeue(timeout, slot, meta, sizeof(*meta)); std::shared_ptr<BufferConsumer> Dequeue(int timeout, size_t* slot, Meta* meta, LocalHandle* acquire_fence) { return Dequeue(timeout, slot, meta, sizeof(*meta), acquire_fence); } std::shared_ptr<BufferConsumer> Dequeue(int timeout, size_t* slot, void* meta, size_t meta_size); private: friend BASE; Loading @@ -335,9 +340,14 @@ class ConsumerQueue : public pdx::ClientBase<ConsumerQueue, BufferHubQueue> { // consumer. int AddBuffer(const std::shared_ptr<BufferConsumer>& buf, size_t slot); int OnBufferReady(std::shared_ptr<BufferHubBuffer> buf) override; int OnBufferReady(std::shared_ptr<BufferHubBuffer> buf, LocalHandle* acquire_fence) override; int OnBufferAllocated() override; std::shared_ptr<BufferConsumer> Dequeue(int timeout, size_t* slot, void* meta, size_t meta_size, LocalHandle* acquire_fence); }; } // namespace dvr Loading
libs/vr/libbufferhubqueue/tests/buffer_hub_queue-test.cpp +28 −19 Original line number Diff line number Diff line Loading @@ -59,12 +59,13 @@ TEST_F(BufferHubQueueTest, TestDequeue) { // But dequeue multiple times. for (size_t i = 0; i < nb_dequeue_times; i++) { size_t slot; auto p1 = producer_queue_->Dequeue(0, &slot); LocalHandle fence; auto p1 = producer_queue_->Dequeue(0, &slot, &fence); ASSERT_NE(nullptr, p1); size_t mi = i; ASSERT_EQ(p1->Post(LocalHandle(), &mi, sizeof(mi)), 0); size_t mo; auto c1 = consumer_queue_->Dequeue(100, &slot, &mo); auto c1 = consumer_queue_->Dequeue(100, &slot, &mo, &fence); ASSERT_NE(nullptr, c1); ASSERT_EQ(mi, mo); c1->Release(LocalHandle()); Loading @@ -91,19 +92,21 @@ TEST_F(BufferHubQueueTest, TestProducerConsumer) { ASSERT_EQ(consumer_queue_->capacity(), i); // Dequeue returns nullptr since no buffer is ready to consumer, but // this implicitly triggers buffer import and bump up |capacity|. auto consumer_null = consumer_queue_->Dequeue(0, &slot, &seq); LocalHandle fence; auto consumer_null = consumer_queue_->Dequeue(0, &slot, &seq, &fence); ASSERT_EQ(nullptr, consumer_null); ASSERT_EQ(consumer_queue_->capacity(), i + 1); } for (size_t i = 0; i < nb_buffer; i++) { LocalHandle fence; // First time, there is no buffer available to dequeue. auto buffer_null = consumer_queue_->Dequeue(0, &slot, &seq); auto buffer_null = consumer_queue_->Dequeue(0, &slot, &seq, &fence); ASSERT_EQ(nullptr, buffer_null); // Make sure Producer buffer is Post()'ed so that it's ready to Accquire // in the consumer's Dequeue() function. auto producer = producer_queue_->Dequeue(0, &slot); auto producer = producer_queue_->Dequeue(0, &slot, &fence); ASSERT_NE(nullptr, producer); uint64_t seq_in = static_cast<uint64_t>(i); Loading @@ -111,7 +114,7 @@ TEST_F(BufferHubQueueTest, TestProducerConsumer) { // Second time, the just |Post()|'ed buffer should be dequeued. uint64_t seq_out = 0; auto consumer = consumer_queue_->Dequeue(0, &slot, &seq_out); auto consumer = consumer_queue_->Dequeue(0, &slot, &seq_out, &fence); ASSERT_NE(nullptr, consumer); ASSERT_EQ(seq_in, seq_out); } Loading @@ -132,11 +135,12 @@ TEST_F(BufferHubQueueTest, TestMetadata) { for (auto mi : ms) { size_t slot; auto p1 = producer_queue_->Dequeue(0, &slot); LocalHandle fence; auto p1 = producer_queue_->Dequeue(0, &slot, &fence); ASSERT_NE(nullptr, p1); ASSERT_EQ(p1->Post(LocalHandle(-1), &mi, sizeof(mi)), 0); TestMetadata mo; auto c1 = consumer_queue_->Dequeue(0, &slot, &mo); auto c1 = consumer_queue_->Dequeue(0, &slot, &mo, &fence); ASSERT_EQ(mi.a, mo.a); ASSERT_EQ(mi.b, mo.b); ASSERT_EQ(mi.c, mo.c); Loading @@ -150,13 +154,14 @@ TEST_F(BufferHubQueueTest, TestMetadataMismatch) { int64_t mi = 3; size_t slot; auto p1 = producer_queue_->Dequeue(0, &slot); LocalHandle fence; auto p1 = producer_queue_->Dequeue(0, &slot, &fence); ASSERT_NE(nullptr, p1); ASSERT_EQ(p1->Post(LocalHandle(-1), &mi, sizeof(mi)), 0); int32_t mo; // Acquire a buffer with mismatched metadata is not OK. auto c1 = consumer_queue_->Dequeue(0, &slot, &mo); auto c1 = consumer_queue_->Dequeue(0, &slot, &mo, &fence); ASSERT_EQ(nullptr, c1); } Loading @@ -165,12 +170,13 @@ TEST_F(BufferHubQueueTest, TestEnqueue) { AllocateBuffer(); size_t slot; auto p1 = producer_queue_->Dequeue(0, &slot); LocalHandle fence; auto p1 = producer_queue_->Dequeue(0, &slot, &fence); ASSERT_NE(nullptr, p1); int64_t mo; producer_queue_->Enqueue(p1, slot); auto c1 = consumer_queue_->Dequeue(0, &slot, &mo); auto c1 = consumer_queue_->Dequeue(0, &slot, &mo, &fence); ASSERT_EQ(nullptr, c1); } Loading @@ -179,12 +185,13 @@ TEST_F(BufferHubQueueTest, TestAllocateBuffer) { size_t s1; AllocateBuffer(); auto p1 = producer_queue_->Dequeue(0, &s1); LocalHandle fence; auto p1 = producer_queue_->Dequeue(0, &s1, &fence); ASSERT_NE(nullptr, p1); // producer queue is exhausted size_t s2; auto p2_null = producer_queue_->Dequeue(0, &s2); auto p2_null = producer_queue_->Dequeue(0, &s2, &fence); ASSERT_EQ(nullptr, p2_null); // dynamically add buffer. Loading @@ -193,7 +200,7 @@ TEST_F(BufferHubQueueTest, TestAllocateBuffer) { ASSERT_EQ(producer_queue_->capacity(), 2U); // now we can dequeue again auto p2 = producer_queue_->Dequeue(0, &s2); auto p2 = producer_queue_->Dequeue(0, &s2, &fence); ASSERT_NE(nullptr, p2); ASSERT_EQ(producer_queue_->count(), 0U); // p1 and p2 should have different slot number Loading @@ -206,14 +213,14 @@ TEST_F(BufferHubQueueTest, TestAllocateBuffer) { int64_t seq = 1; ASSERT_EQ(p1->Post(LocalHandle(), seq), 0); size_t cs1, cs2; auto c1 = consumer_queue_->Dequeue(0, &cs1, &seq); auto c1 = consumer_queue_->Dequeue(0, &cs1, &seq, &fence); ASSERT_NE(nullptr, c1); ASSERT_EQ(consumer_queue_->count(), 0U); ASSERT_EQ(consumer_queue_->capacity(), 2U); ASSERT_EQ(cs1, s1); ASSERT_EQ(p2->Post(LocalHandle(), seq), 0); auto c2 = consumer_queue_->Dequeue(0, &cs2, &seq); auto c2 = consumer_queue_->Dequeue(0, &cs2, &seq, &fence); ASSERT_NE(nullptr, c2); ASSERT_EQ(cs2, s2); } Loading @@ -229,7 +236,8 @@ TEST_F(BufferHubQueueTest, TestUsageSetMask) { kBufferSliceCount, &slot); ASSERT_EQ(ret, 0); auto p1 = producer_queue_->Dequeue(0, &slot); LocalHandle fence; auto p1 = producer_queue_->Dequeue(0, &slot, &fence); ASSERT_EQ(p1->usage() & set_mask, set_mask); } Loading @@ -244,7 +252,8 @@ TEST_F(BufferHubQueueTest, TestUsageClearMask) { kBufferSliceCount, &slot); ASSERT_EQ(ret, 0); auto p1 = producer_queue_->Dequeue(0, &slot); LocalHandle fence; auto p1 = producer_queue_->Dequeue(0, &slot, &fence); ASSERT_EQ(p1->usage() & clear_mask, 0); } Loading
libs/vr/libvrflinger/video_compositor.cpp +3 −1 Original line number Diff line number Diff line Loading @@ -79,7 +79,9 @@ GLuint VideoCompositor::GetActiveTextureId(EGLDisplay display) { // queued in order from the producer side. // TODO(jwcai) Use |metadata.timestamp_ns| to schedule video frames // accurately. auto buffer_consumer = consumer_queue_->Dequeue(0, &slot, &metadata); LocalHandle acquire_fence; auto buffer_consumer = consumer_queue_->Dequeue(0, &slot, &metadata, &acquire_fence); if (buffer_consumer) { // Create a new texture if it hasn't been created yet, or the same slot Loading