Loading libs/vr/libbufferhubqueue/buffer_hub_queue_client.cpp +109 −61 Original line number Diff line number Diff line Loading @@ -69,7 +69,7 @@ void BufferHubQueue::Initialize() { .data = {.u64 = Stuff(-1, BufferHubQueue::kEpollQueueEventIndex)}}; ret = epoll_fd_.Control(EPOLL_CTL_ADD, event_fd(), &event); if (ret < 0) { ALOGE("BufferHubQueue::Initialize: Failed to add event fd to epoll set: %s", ALOGE("%s: Failed to add event fd to epoll set: %s", __FUNCTION__, strerror(-ret)); } } Loading @@ -77,7 +77,7 @@ void BufferHubQueue::Initialize() { Status<void> BufferHubQueue::ImportQueue() { auto status = InvokeRemoteMethod<BufferHubRPC::GetQueueInfo>(); if (!status) { ALOGE("BufferHubQueue::ImportQueue: Failed to import queue: %s", ALOGE("%s: Failed to import queue: %s", __FUNCTION__, status.GetErrorMessage().c_str()); return ErrorStatus(status.error()); } else { Loading Loading @@ -136,9 +136,7 @@ BufferHubQueue::CreateConsumerQueueParcelable(bool silent) { consumer_queue->GetChannel()->TakeChannelParcelable()); if (!queue_parcelable.IsValid()) { ALOGE( "BufferHubQueue::CreateConsumerQueueParcelable: Failed to create " "consumer queue parcelable."); ALOGE("%s: Failed to create consumer queue parcelable.", __FUNCTION__); return ErrorStatus(EINVAL); } Loading Loading @@ -169,8 +167,7 @@ bool BufferHubQueue::WaitForBuffers(int timeout) { } if (ret < 0 && ret != -EINTR) { ALOGE("BufferHubQueue::WaitForBuffers: Failed to wait for buffers: %s", strerror(-ret)); ALOGE("%s: Failed to wait for buffers: %s", __FUNCTION__, strerror(-ret)); return false; } Loading Loading @@ -264,14 +261,14 @@ Status<void> BufferHubQueue::HandleQueueEvent(int poll_event) { // wait will be tried again to acquire the newly imported buffer. auto buffer_status = OnBufferAllocated(); if (!buffer_status) { ALOGE("BufferHubQueue::HandleQueueEvent: Failed to import buffer: %s", ALOGE("%s: Failed to import buffer: %s", __FUNCTION__, buffer_status.GetErrorMessage().c_str()); } } else if (events & EPOLLHUP) { ALOGD_IF(TRACE, "BufferHubQueue::HandleQueueEvent: hang up event!"); ALOGD_IF(TRACE, "%s: hang up event!", __FUNCTION__); hung_up_ = true; } else { ALOGW("BufferHubQueue::HandleQueueEvent: Unknown epoll events=%x", events); ALOGW("%s: Unknown epoll events=%x", __FUNCTION__, events); } return {}; Loading @@ -279,12 +276,11 @@ Status<void> BufferHubQueue::HandleQueueEvent(int poll_event) { Status<void> BufferHubQueue::AddBuffer( const std::shared_ptr<BufferHubBase>& buffer, size_t slot) { ALOGD_IF(TRACE, "BufferHubQueue::AddBuffer: buffer_id=%d slot=%zu", buffer->id(), slot); ALOGD_IF(TRACE, "%s: buffer_id=%d slot=%zu", __FUNCTION__, buffer->id(), slot); if (is_full()) { ALOGE("BufferHubQueue::AddBuffer queue is at maximum capacity: %zu", capacity_); ALOGE("%s: queue is at maximum capacity: %zu", __FUNCTION__, capacity_); return ErrorStatus(E2BIG); } Loading @@ -303,7 +299,7 @@ Status<void> BufferHubQueue::AddBuffer( const int ret = epoll_fd_.Control(EPOLL_CTL_ADD, event_source.event_fd, &event); if (ret < 0) { ALOGE("BufferHubQueue::AddBuffer: Failed to add buffer to epoll set: %s", ALOGE("%s: Failed to add buffer to epoll set: %s", __FUNCTION__, strerror(-ret)); return ErrorStatus(-ret); } Loading @@ -315,16 +311,14 @@ Status<void> BufferHubQueue::AddBuffer( } Status<void> BufferHubQueue::RemoveBuffer(size_t slot) { ALOGD_IF(TRACE, "BufferHubQueue::RemoveBuffer: slot=%zu", slot); ALOGD_IF(TRACE, "%s: slot=%zu", __FUNCTION__, slot); if (buffers_[slot]) { for (const auto& event_source : buffers_[slot]->GetEventSources()) { const int ret = epoll_fd_.Control(EPOLL_CTL_DEL, event_source.event_fd, nullptr); if (ret < 0) { ALOGE( "BufferHubQueue::RemoveBuffer: Failed to remove buffer from epoll " "set: %s", ALOGE("%s: Failed to remove buffer from epoll set: %s", __FUNCTION__, strerror(-ret)); return ErrorStatus(-ret); } Loading @@ -345,23 +339,31 @@ Status<void> BufferHubQueue::Enqueue(Entry entry) { if (!is_full()) { available_buffers_.push(std::move(entry)); // Find and remove the enqueued buffer from unavailable_buffers_slot if // exist. auto enqueued_buffer_iter = std::find_if( unavailable_buffers_slot_.begin(), unavailable_buffers_slot_.end(), [&entry](size_t slot) -> bool { return slot == entry.slot; }); if (enqueued_buffer_iter != unavailable_buffers_slot_.end()) { unavailable_buffers_slot_.erase(enqueued_buffer_iter); } // Trigger OnBufferAvailable callback if registered. if (on_buffer_available_) on_buffer_available_(); return {}; } else { ALOGE("BufferHubQueue::Enqueue: Buffer queue is full!"); ALOGE("%s: Buffer queue is full!", __FUNCTION__); return ErrorStatus(E2BIG); } } Status<std::shared_ptr<BufferHubBase>> BufferHubQueue::Dequeue(int timeout, size_t* slot) { ALOGD_IF(TRACE, "BufferHubQueue::Dequeue: count=%zu, timeout=%d", count(), timeout); ALOGD_IF(TRACE, "%s: count=%zu, timeout=%d", __FUNCTION__, count(), timeout); PDX_TRACE_FORMAT("BufferHubQueue::Dequeue|count=%zu|", count()); PDX_TRACE_FORMAT("%s|count=%zu|", __FUNCTION__, count()); if (count() == 0) { if (!WaitForBuffers(timeout)) Loading @@ -376,6 +378,7 @@ Status<std::shared_ptr<BufferHubBase>> BufferHubQueue::Dequeue(int timeout, *slot = entry.slot; available_buffers_.pop(); unavailable_buffers_slot_.push_back(*slot); return {std::move(buffer)}; } Loading Loading @@ -564,7 +567,7 @@ Status<void> ProducerQueue::RemoveBuffer(size_t slot) { auto status = InvokeRemoteMethod<BufferHubRPC::ProducerQueueRemoveBuffer>(slot); if (!status) { ALOGE("ProducerQueue::RemoveBuffer: Failed to remove producer buffer: %s", ALOGE("%s: Failed to remove producer buffer: %s", __FUNCTION__, status.GetErrorMessage().c_str()); return status.error_status(); } Loading @@ -580,31 +583,81 @@ Status<std::shared_ptr<BufferProducer>> ProducerQueue::Dequeue( pdx::Status<std::shared_ptr<BufferProducer>> ProducerQueue::Dequeue( int timeout, size_t* slot, DvrNativeBufferMetadata* out_meta, pdx::LocalHandle* release_fence) { pdx::LocalHandle* release_fence, bool gain_posted_buffer) { ATRACE_NAME("ProducerQueue::Dequeue"); if (slot == nullptr || out_meta == nullptr || release_fence == nullptr) { ALOGE("ProducerQueue::Dequeue: Invalid parameter."); ALOGE("%s: Invalid parameter.", __FUNCTION__); return ErrorStatus(EINVAL); } auto status = BufferHubQueue::Dequeue(timeout, slot); if (!status) return status.error_status(); auto buffer = std::static_pointer_cast<BufferProducer>(status.take()); const int ret = buffer->GainAsync(out_meta, release_fence); std::shared_ptr<BufferProducer> buffer; Status<std::shared_ptr<BufferHubBase>> dequeue_status = BufferHubQueue::Dequeue(timeout, slot); if (dequeue_status.ok()) { buffer = std::static_pointer_cast<BufferProducer>(dequeue_status.take()); } else { if (gain_posted_buffer) { Status<std::shared_ptr<BufferProducer>> dequeue_unacquired_status = ProducerQueue::DequeueUnacquiredBuffer(slot); if (!dequeue_unacquired_status.ok()) { ALOGE("%s: DequeueUnacquiredBuffer returned error: %d", __FUNCTION__, dequeue_unacquired_status.error()); return dequeue_unacquired_status.error_status(); } buffer = dequeue_unacquired_status.take(); } else { return dequeue_status.error_status(); } } const int ret = buffer->GainAsync(out_meta, release_fence, gain_posted_buffer); if (ret < 0 && ret != -EALREADY) return ErrorStatus(-ret); return {std::move(buffer)}; } Status<std::shared_ptr<BufferProducer>> ProducerQueue::DequeueUnacquiredBuffer( size_t* slot) { if (unavailable_buffers_slot_.size() < 1) { ALOGE( "%s: Failed to dequeue un-acquired buffer. All buffer(s) are in " "acquired state if exist.", __FUNCTION__); return ErrorStatus(ENOMEM); } // Find the first buffer that is not in acquired state from // unavailable_buffers_slot_. for (auto iter = unavailable_buffers_slot_.begin(); iter != unavailable_buffers_slot_.end(); iter++) { std::shared_ptr<BufferProducer> buffer = ProducerQueue::GetBuffer(*iter); if (buffer == nullptr) { ALOGE("%s failed. Buffer slot %d is null.", __FUNCTION__, static_cast<int>(*slot)); return ErrorStatus(EIO); } if (!BufferHubDefs::IsBufferAcquired(buffer->buffer_state())) { *slot = *iter; unavailable_buffers_slot_.erase(iter); unavailable_buffers_slot_.push_back(*slot); ALOGD("%s: Producer queue dequeue unacquired buffer in slot %d", __FUNCTION__, static_cast<int>(*slot)); return {std::move(buffer)}; } } ALOGE( "%s: Failed to dequeue un-acquired buffer. No un-acquired buffer exist.", __FUNCTION__); return ErrorStatus(EBUSY); } pdx::Status<ProducerQueueParcelable> ProducerQueue::TakeAsParcelable() { if (capacity() != 0) { ALOGE( "ProducerQueue::TakeAsParcelable: producer queue can only be taken out" " as a parcelable when empty. Current queue capacity: %zu", capacity()); "%s: producer queue can only be taken out as a parcelable when empty. " "Current queue capacity: %zu", __FUNCTION__, capacity()); return ErrorStatus(EINVAL); } Loading @@ -628,17 +681,16 @@ ConsumerQueue::ConsumerQueue(LocalChannelHandle handle) : BufferHubQueue(std::move(handle)) { auto status = ImportQueue(); if (!status) { ALOGE("ConsumerQueue::ConsumerQueue: Failed to import queue: %s", ALOGE("%s: Failed to import queue: %s", __FUNCTION__, status.GetErrorMessage().c_str()); Close(-status.error()); } auto import_status = ImportBuffers(); if (import_status) { ALOGI("ConsumerQueue::ConsumerQueue: Imported %zu buffers.", import_status.get()); ALOGI("%s: Imported %zu buffers.", __FUNCTION__, import_status.get()); } else { ALOGE("ConsumerQueue::ConsumerQueue: Failed to import buffers: %s", ALOGE("%s: Failed to import buffers: %s", __FUNCTION__, import_status.GetErrorMessage().c_str()); } } Loading @@ -647,13 +699,10 @@ Status<size_t> ConsumerQueue::ImportBuffers() { auto status = InvokeRemoteMethod<BufferHubRPC::ConsumerQueueImportBuffers>(); if (!status) { if (status.error() == EBADR) { ALOGI( "ConsumerQueue::ImportBuffers: Queue is silent, no buffers " "imported."); ALOGI("%s: Queue is silent, no buffers imported.", __FUNCTION__); return {0}; } else { ALOGE( "ConsumerQueue::ImportBuffers: Failed to import consumer buffer: %s", ALOGE("%s: Failed to import consumer buffer: %s", __FUNCTION__, status.GetErrorMessage().c_str()); return status.error_status(); } Loading @@ -665,13 +714,13 @@ Status<size_t> ConsumerQueue::ImportBuffers() { auto buffer_handle_slots = status.take(); for (auto& buffer_handle_slot : buffer_handle_slots) { ALOGD_IF(TRACE, "ConsumerQueue::ImportBuffers: buffer_handle=%d", ALOGD_IF(TRACE, ": buffer_handle=%d", __FUNCTION__, buffer_handle_slot.first.value()); std::unique_ptr<BufferConsumer> buffer_consumer = BufferConsumer::Import(std::move(buffer_handle_slot.first)); if (!buffer_consumer) { ALOGE("ConsumerQueue::ImportBuffers: Failed to import buffer: slot=%zu", ALOGE("%s: Failed to import buffer: slot=%zu", __FUNCTION__, buffer_handle_slot.second); last_error = ErrorStatus(EPIPE); continue; Loading @@ -680,7 +729,7 @@ Status<size_t> ConsumerQueue::ImportBuffers() { auto add_status = AddBuffer(std::move(buffer_consumer), buffer_handle_slot.second); if (!add_status) { ALOGE("ConsumerQueue::ImportBuffers: Failed to add buffer: %s", ALOGE("%s: Failed to add buffer: %s", __FUNCTION__, add_status.GetErrorMessage().c_str()); last_error = add_status; } else { Loading @@ -696,8 +745,8 @@ Status<size_t> ConsumerQueue::ImportBuffers() { Status<void> ConsumerQueue::AddBuffer( const std::shared_ptr<BufferConsumer>& buffer, size_t slot) { ALOGD_IF(TRACE, "ConsumerQueue::AddBuffer: queue_id=%d buffer_id=%d slot=%zu", id(), buffer->id(), slot); ALOGD_IF(TRACE, "%s: queue_id=%d buffer_id=%d slot=%zu", __FUNCTION__, id(), buffer->id(), slot); return BufferHubQueue::AddBuffer(buffer, slot); } Loading @@ -706,9 +755,9 @@ Status<std::shared_ptr<BufferConsumer>> ConsumerQueue::Dequeue( LocalHandle* acquire_fence) { if (user_metadata_size != user_metadata_size_) { ALOGE( "ConsumerQueue::Dequeue: Metadata size (%zu) for the dequeuing buffer " "does not match metadata size (%zu) for the queue.", user_metadata_size, user_metadata_size_); "%s: Metadata size (%zu) for the dequeuing buffer does not match " "metadata size (%zu) for the queue.", __FUNCTION__, user_metadata_size, user_metadata_size_); return ErrorStatus(EINVAL); } Loading @@ -723,7 +772,7 @@ Status<std::shared_ptr<BufferConsumer>> ConsumerQueue::Dequeue( if (metadata_src) { memcpy(meta, metadata_src, user_metadata_size); } else { ALOGW("ConsumerQueue::Dequeue: no user-defined metadata."); ALOGW("%s: no user-defined metadata.", __FUNCTION__); } } Loading @@ -735,7 +784,7 @@ Status<std::shared_ptr<BufferConsumer>> ConsumerQueue::Dequeue( pdx::LocalHandle* acquire_fence) { ATRACE_NAME("ConsumerQueue::Dequeue"); if (slot == nullptr || out_meta == nullptr || acquire_fence == nullptr) { ALOGE("ConsumerQueue::Dequeue: Invalid parameter."); ALOGE("%s: Invalid parameter.", __FUNCTION__); return ErrorStatus(EINVAL); } Loading @@ -752,19 +801,18 @@ Status<std::shared_ptr<BufferConsumer>> ConsumerQueue::Dequeue( } Status<void> ConsumerQueue::OnBufferAllocated() { ALOGD_IF(TRACE, "ConsumerQueue::OnBufferAllocated: queue_id=%d", id()); ALOGD_IF(TRACE, "%s: queue_id=%d", __FUNCTION__, id()); auto status = ImportBuffers(); if (!status) { ALOGE("ConsumerQueue::OnBufferAllocated: Failed to import buffers: %s", ALOGE("%s: Failed to import buffers: %s", __FUNCTION__, status.GetErrorMessage().c_str()); return ErrorStatus(status.error()); } else if (status.get() == 0) { ALOGW("ConsumerQueue::OnBufferAllocated: No new buffers allocated!"); ALOGW("%s: No new buffers allocated!", __FUNCTION__); return ErrorStatus(ENOBUFS); } else { ALOGD_IF(TRACE, "ConsumerQueue::OnBufferAllocated: Imported %zu consumer buffers.", ALOGD_IF(TRACE, "%s: Imported %zu consumer buffers.", __FUNCTION__, status.get()); return {}; } Loading libs/vr/libbufferhubqueue/include/private/dvr/buffer_hub_queue_client.h +40 −7 Original line number Diff line number Diff line Loading @@ -57,10 +57,10 @@ class BufferHubQueue : public pdx::Client { uint32_t default_width() const { return default_width_; } // Returns the default buffer height of this buffer queue. uint32_t default_height() const { return static_cast<uint32_t>(default_height_); } uint32_t default_height() const { return default_height_; } // Returns the default buffer format of this buffer queue. uint32_t default_format() const { return static_cast<uint32_t>(default_format_); } uint32_t default_format() const { return default_format_; } // Creates a new consumer in handle form for immediate transport over RPC. pdx::Status<pdx::LocalChannelHandle> CreateConsumerQueueHandle( Loading Loading @@ -208,6 +208,14 @@ class BufferHubQueue : public pdx::Client { // Size of the metadata that buffers in this queue cary. size_t user_metadata_size_{0}; // Buffers and related data that are available for dequeue. std::priority_queue<Entry, std::vector<Entry>, EntryComparator> available_buffers_; // Slot of the buffers that are not available for normal dequeue. For example, // the slot of posted or acquired buffers in the perspective of a producer. std::vector<size_t> unavailable_buffers_slot_; private: void Initialize(); Loading Loading @@ -252,10 +260,6 @@ class BufferHubQueue : public pdx::Client { // queue regardless of its queue position or presence in the ring buffer. std::array<std::shared_ptr<BufferHubBase>, kMaxQueueCapacity> buffers_; // Buffers and related data that are available for dequeue. std::priority_queue<Entry, std::vector<Entry>, EntryComparator> available_buffers_; // Keeps track with how many buffers have been added into the queue. size_t capacity_{0}; Loading Loading @@ -349,11 +353,30 @@ class ProducerQueue : public pdx::ClientBase<ProducerQueue, BufferHubQueue> { // Dequeue a producer buffer to write. The returned buffer in |Gain|'ed mode, // and caller should call Post() once it's done writing to release the buffer // to the consumer side. // @return a buffer in gained state, which was originally in released state. pdx::Status<std::shared_ptr<BufferProducer>> Dequeue( int timeout, size_t* slot, pdx::LocalHandle* release_fence); // Dequeue a producer buffer to write. The returned buffer in |Gain|'ed mode, // and caller should call Post() once it's done writing to release the buffer // to the consumer side. // // @param timeout to dequeue a buffer. // @param slot is the slot of the output BufferProducer. // @param release_fence for gaining a buffer. // @param out_meta metadata of the output buffer. // @param gain_posted_buffer whether to gain posted buffer if no released // buffer is available to gain. // @return a buffer in gained state, which was originally in released state if // gain_posted_buffer is false, or in posted/released state if // gain_posted_buffer is true. // TODO(b/112007999): gain_posted_buffer true is only used to prevent // libdvrtracking from starving when there are non-responding clients. This // gain_posted_buffer param can be removed once libdvrtracking start to use // the new AHardwareBuffer API. pdx::Status<std::shared_ptr<BufferProducer>> Dequeue( int timeout, size_t* slot, DvrNativeBufferMetadata* out_meta, pdx::LocalHandle* release_fence); pdx::LocalHandle* release_fence, bool gain_posted_buffer = false); // Enqueues a producer buffer in the queue. pdx::Status<void> Enqueue(const std::shared_ptr<BufferProducer>& buffer, Loading @@ -374,6 +397,16 @@ class ProducerQueue : public pdx::ClientBase<ProducerQueue, BufferHubQueue> { // arguments as the constructors. explicit ProducerQueue(pdx::LocalChannelHandle handle); ProducerQueue(const ProducerQueueConfig& config, const UsagePolicy& usage); // Dequeue a producer buffer to write. The returned buffer in |Gain|'ed mode, // and caller should call Post() once it's done writing to release the buffer // to the consumer side. // // @param slot the slot of the returned buffer. // @return a buffer in gained state, which was originally in posted state or // released state. pdx::Status<std::shared_ptr<BufferProducer>> DequeueUnacquiredBuffer( size_t* slot); }; class ConsumerQueue : public BufferHubQueue { Loading libs/vr/libbufferhubqueue/tests/buffer_hub_queue-test.cpp +144 −2 Original line number Diff line number Diff line #include <base/logging.h> #include <binder/Parcel.h> #include <dvr/dvr_api.h> #include <private/dvr/buffer_hub_client.h> #include <private/dvr/buffer_hub_queue_client.h> Loading Loading @@ -122,6 +123,147 @@ TEST_F(BufferHubQueueTest, TestDequeue) { } } TEST_F(BufferHubQueueTest, TestDequeuePostedBufferIfNoAvailableReleasedBuffer_withBufferConsumer) { ASSERT_TRUE(CreateQueues(config_builder_.Build(), UsagePolicy{})); // Allocate 3 buffers to use. const size_t test_queue_capacity = 3; for (int64_t i = 0; i < test_queue_capacity; i++) { AllocateBuffer(); } EXPECT_EQ(producer_queue_->capacity(), test_queue_capacity); size_t producer_slot, consumer_slot; LocalHandle fence; DvrNativeBufferMetadata mi, mo; // Producer posts 2 buffers and remember their posted sequence. std::deque<size_t> posted_slots; for (int64_t i = 0; i < 2; i++) { auto p1_status = producer_queue_->Dequeue(kTimeoutMs, &producer_slot, &mo, &fence, true); EXPECT_TRUE(p1_status.ok()); auto p1 = p1_status.take(); ASSERT_NE(p1, nullptr); // Producer should not be gaining posted buffer when there are still // available buffers to gain. auto found_iter = std::find(posted_slots.begin(), posted_slots.end(), producer_slot); EXPECT_EQ(found_iter, posted_slots.end()); posted_slots.push_back(producer_slot); // Producer posts the buffer. mi.index = i; EXPECT_EQ(0, p1->PostAsync(&mi, LocalHandle())); } // Consumer acquires one buffer. auto c1_status = consumer_queue_->Dequeue(kTimeoutMs, &consumer_slot, &mo, &fence); EXPECT_TRUE(c1_status.ok()); auto c1 = c1_status.take(); ASSERT_NE(c1, nullptr); // Consumer should get the oldest posted buffer. No checks here. // posted_slots[0] should be in acquired state now. EXPECT_EQ(mo.index, 0); // Consumer releases the buffer. EXPECT_EQ(c1->ReleaseAsync(&mi, LocalHandle()), 0); // posted_slots[0] should be in released state now. // Producer gain and post 2 buffers. for (int64_t i = 0; i < 2; i++) { auto p1_status = producer_queue_->Dequeue(kTimeoutMs, &producer_slot, &mo, &fence, true); EXPECT_TRUE(p1_status.ok()); auto p1 = p1_status.take(); ASSERT_NE(p1, nullptr); // The gained buffer should be the one in released state or the one haven't // been use. EXPECT_NE(posted_slots[1], producer_slot); mi.index = i + 2; EXPECT_EQ(0, p1->PostAsync(&mi, LocalHandle())); } // Producer gains a buffer. auto p1_status = producer_queue_->Dequeue(kTimeoutMs, &producer_slot, &mo, &fence, true); EXPECT_TRUE(p1_status.ok()); auto p1 = p1_status.take(); ASSERT_NE(p1, nullptr); // The gained buffer should be the oldest posted buffer. EXPECT_EQ(posted_slots[1], producer_slot); // Producer posts the buffer. mi.index = 4; EXPECT_EQ(0, p1->PostAsync(&mi, LocalHandle())); } TEST_F(BufferHubQueueTest, TestDequeuePostedBufferIfNoAvailableReleasedBuffer_noBufferConsumer) { ASSERT_TRUE(CreateQueues(config_builder_.Build(), UsagePolicy{})); // Allocate 4 buffers to use. const size_t test_queue_capacity = 4; for (int64_t i = 0; i < test_queue_capacity; i++) { AllocateBuffer(); } EXPECT_EQ(producer_queue_->capacity(), test_queue_capacity); // Post all allowed buffers and remember their posted sequence. std::deque<size_t> posted_slots; for (int64_t i = 0; i < test_queue_capacity; i++) { size_t slot; LocalHandle fence; DvrNativeBufferMetadata mi, mo; // Producer gains a buffer. auto p1_status = producer_queue_->Dequeue(kTimeoutMs, &slot, &mo, &fence, true); EXPECT_TRUE(p1_status.ok()); auto p1 = p1_status.take(); ASSERT_NE(p1, nullptr); // Producer should not be gaining posted buffer when there are still // available buffers to gain. auto found_iter = std::find(posted_slots.begin(), posted_slots.end(), slot); EXPECT_EQ(found_iter, posted_slots.end()); posted_slots.push_back(slot); // Producer posts the buffer. mi.index = i; EXPECT_EQ(p1->PostAsync(&mi, LocalHandle()), 0); } // Gain posted buffers in sequence. const int64_t nb_dequeue_all_times = 2; for (int j = 0; j < nb_dequeue_all_times; ++j) { for (int i = 0; i < test_queue_capacity; ++i) { size_t slot; LocalHandle fence; DvrNativeBufferMetadata mi, mo; // Producer gains a buffer. auto p1_status = producer_queue_->Dequeue(kTimeoutMs, &slot, &mo, &fence, true); EXPECT_TRUE(p1_status.ok()); auto p1 = p1_status.take(); ASSERT_NE(p1, nullptr); // The gained buffer should be the oldest posted buffer. EXPECT_EQ(posted_slots[i], slot); // Producer posts the buffer. mi.index = i + test_queue_capacity * (j + 1); EXPECT_EQ(p1->PostAsync(&mi, LocalHandle()), 0); } } } TEST_F(BufferHubQueueTest, TestProducerConsumer) { const size_t kBufferCount = 16; size_t slot; Loading Loading @@ -245,8 +387,8 @@ TEST_F(BufferHubQueueTest, TestRemoveBuffer) { for (size_t i = 0; i < kBufferCount; i++) { Entry* entry = &buffers[i]; auto producer_status = producer_queue_->Dequeue( kTimeoutMs, &entry->slot, &mo, &entry->fence); auto producer_status = producer_queue_->Dequeue(kTimeoutMs, &entry->slot, &mo, &entry->fence); ASSERT_TRUE(producer_status.ok()); entry->buffer = producer_status.take(); ASSERT_NE(nullptr, entry->buffer); Loading Loading
libs/vr/libbufferhubqueue/buffer_hub_queue_client.cpp +109 −61 Original line number Diff line number Diff line Loading @@ -69,7 +69,7 @@ void BufferHubQueue::Initialize() { .data = {.u64 = Stuff(-1, BufferHubQueue::kEpollQueueEventIndex)}}; ret = epoll_fd_.Control(EPOLL_CTL_ADD, event_fd(), &event); if (ret < 0) { ALOGE("BufferHubQueue::Initialize: Failed to add event fd to epoll set: %s", ALOGE("%s: Failed to add event fd to epoll set: %s", __FUNCTION__, strerror(-ret)); } } Loading @@ -77,7 +77,7 @@ void BufferHubQueue::Initialize() { Status<void> BufferHubQueue::ImportQueue() { auto status = InvokeRemoteMethod<BufferHubRPC::GetQueueInfo>(); if (!status) { ALOGE("BufferHubQueue::ImportQueue: Failed to import queue: %s", ALOGE("%s: Failed to import queue: %s", __FUNCTION__, status.GetErrorMessage().c_str()); return ErrorStatus(status.error()); } else { Loading Loading @@ -136,9 +136,7 @@ BufferHubQueue::CreateConsumerQueueParcelable(bool silent) { consumer_queue->GetChannel()->TakeChannelParcelable()); if (!queue_parcelable.IsValid()) { ALOGE( "BufferHubQueue::CreateConsumerQueueParcelable: Failed to create " "consumer queue parcelable."); ALOGE("%s: Failed to create consumer queue parcelable.", __FUNCTION__); return ErrorStatus(EINVAL); } Loading Loading @@ -169,8 +167,7 @@ bool BufferHubQueue::WaitForBuffers(int timeout) { } if (ret < 0 && ret != -EINTR) { ALOGE("BufferHubQueue::WaitForBuffers: Failed to wait for buffers: %s", strerror(-ret)); ALOGE("%s: Failed to wait for buffers: %s", __FUNCTION__, strerror(-ret)); return false; } Loading Loading @@ -264,14 +261,14 @@ Status<void> BufferHubQueue::HandleQueueEvent(int poll_event) { // wait will be tried again to acquire the newly imported buffer. auto buffer_status = OnBufferAllocated(); if (!buffer_status) { ALOGE("BufferHubQueue::HandleQueueEvent: Failed to import buffer: %s", ALOGE("%s: Failed to import buffer: %s", __FUNCTION__, buffer_status.GetErrorMessage().c_str()); } } else if (events & EPOLLHUP) { ALOGD_IF(TRACE, "BufferHubQueue::HandleQueueEvent: hang up event!"); ALOGD_IF(TRACE, "%s: hang up event!", __FUNCTION__); hung_up_ = true; } else { ALOGW("BufferHubQueue::HandleQueueEvent: Unknown epoll events=%x", events); ALOGW("%s: Unknown epoll events=%x", __FUNCTION__, events); } return {}; Loading @@ -279,12 +276,11 @@ Status<void> BufferHubQueue::HandleQueueEvent(int poll_event) { Status<void> BufferHubQueue::AddBuffer( const std::shared_ptr<BufferHubBase>& buffer, size_t slot) { ALOGD_IF(TRACE, "BufferHubQueue::AddBuffer: buffer_id=%d slot=%zu", buffer->id(), slot); ALOGD_IF(TRACE, "%s: buffer_id=%d slot=%zu", __FUNCTION__, buffer->id(), slot); if (is_full()) { ALOGE("BufferHubQueue::AddBuffer queue is at maximum capacity: %zu", capacity_); ALOGE("%s: queue is at maximum capacity: %zu", __FUNCTION__, capacity_); return ErrorStatus(E2BIG); } Loading @@ -303,7 +299,7 @@ Status<void> BufferHubQueue::AddBuffer( const int ret = epoll_fd_.Control(EPOLL_CTL_ADD, event_source.event_fd, &event); if (ret < 0) { ALOGE("BufferHubQueue::AddBuffer: Failed to add buffer to epoll set: %s", ALOGE("%s: Failed to add buffer to epoll set: %s", __FUNCTION__, strerror(-ret)); return ErrorStatus(-ret); } Loading @@ -315,16 +311,14 @@ Status<void> BufferHubQueue::AddBuffer( } Status<void> BufferHubQueue::RemoveBuffer(size_t slot) { ALOGD_IF(TRACE, "BufferHubQueue::RemoveBuffer: slot=%zu", slot); ALOGD_IF(TRACE, "%s: slot=%zu", __FUNCTION__, slot); if (buffers_[slot]) { for (const auto& event_source : buffers_[slot]->GetEventSources()) { const int ret = epoll_fd_.Control(EPOLL_CTL_DEL, event_source.event_fd, nullptr); if (ret < 0) { ALOGE( "BufferHubQueue::RemoveBuffer: Failed to remove buffer from epoll " "set: %s", ALOGE("%s: Failed to remove buffer from epoll set: %s", __FUNCTION__, strerror(-ret)); return ErrorStatus(-ret); } Loading @@ -345,23 +339,31 @@ Status<void> BufferHubQueue::Enqueue(Entry entry) { if (!is_full()) { available_buffers_.push(std::move(entry)); // Find and remove the enqueued buffer from unavailable_buffers_slot if // exist. auto enqueued_buffer_iter = std::find_if( unavailable_buffers_slot_.begin(), unavailable_buffers_slot_.end(), [&entry](size_t slot) -> bool { return slot == entry.slot; }); if (enqueued_buffer_iter != unavailable_buffers_slot_.end()) { unavailable_buffers_slot_.erase(enqueued_buffer_iter); } // Trigger OnBufferAvailable callback if registered. if (on_buffer_available_) on_buffer_available_(); return {}; } else { ALOGE("BufferHubQueue::Enqueue: Buffer queue is full!"); ALOGE("%s: Buffer queue is full!", __FUNCTION__); return ErrorStatus(E2BIG); } } Status<std::shared_ptr<BufferHubBase>> BufferHubQueue::Dequeue(int timeout, size_t* slot) { ALOGD_IF(TRACE, "BufferHubQueue::Dequeue: count=%zu, timeout=%d", count(), timeout); ALOGD_IF(TRACE, "%s: count=%zu, timeout=%d", __FUNCTION__, count(), timeout); PDX_TRACE_FORMAT("BufferHubQueue::Dequeue|count=%zu|", count()); PDX_TRACE_FORMAT("%s|count=%zu|", __FUNCTION__, count()); if (count() == 0) { if (!WaitForBuffers(timeout)) Loading @@ -376,6 +378,7 @@ Status<std::shared_ptr<BufferHubBase>> BufferHubQueue::Dequeue(int timeout, *slot = entry.slot; available_buffers_.pop(); unavailable_buffers_slot_.push_back(*slot); return {std::move(buffer)}; } Loading Loading @@ -564,7 +567,7 @@ Status<void> ProducerQueue::RemoveBuffer(size_t slot) { auto status = InvokeRemoteMethod<BufferHubRPC::ProducerQueueRemoveBuffer>(slot); if (!status) { ALOGE("ProducerQueue::RemoveBuffer: Failed to remove producer buffer: %s", ALOGE("%s: Failed to remove producer buffer: %s", __FUNCTION__, status.GetErrorMessage().c_str()); return status.error_status(); } Loading @@ -580,31 +583,81 @@ Status<std::shared_ptr<BufferProducer>> ProducerQueue::Dequeue( pdx::Status<std::shared_ptr<BufferProducer>> ProducerQueue::Dequeue( int timeout, size_t* slot, DvrNativeBufferMetadata* out_meta, pdx::LocalHandle* release_fence) { pdx::LocalHandle* release_fence, bool gain_posted_buffer) { ATRACE_NAME("ProducerQueue::Dequeue"); if (slot == nullptr || out_meta == nullptr || release_fence == nullptr) { ALOGE("ProducerQueue::Dequeue: Invalid parameter."); ALOGE("%s: Invalid parameter.", __FUNCTION__); return ErrorStatus(EINVAL); } auto status = BufferHubQueue::Dequeue(timeout, slot); if (!status) return status.error_status(); auto buffer = std::static_pointer_cast<BufferProducer>(status.take()); const int ret = buffer->GainAsync(out_meta, release_fence); std::shared_ptr<BufferProducer> buffer; Status<std::shared_ptr<BufferHubBase>> dequeue_status = BufferHubQueue::Dequeue(timeout, slot); if (dequeue_status.ok()) { buffer = std::static_pointer_cast<BufferProducer>(dequeue_status.take()); } else { if (gain_posted_buffer) { Status<std::shared_ptr<BufferProducer>> dequeue_unacquired_status = ProducerQueue::DequeueUnacquiredBuffer(slot); if (!dequeue_unacquired_status.ok()) { ALOGE("%s: DequeueUnacquiredBuffer returned error: %d", __FUNCTION__, dequeue_unacquired_status.error()); return dequeue_unacquired_status.error_status(); } buffer = dequeue_unacquired_status.take(); } else { return dequeue_status.error_status(); } } const int ret = buffer->GainAsync(out_meta, release_fence, gain_posted_buffer); if (ret < 0 && ret != -EALREADY) return ErrorStatus(-ret); return {std::move(buffer)}; } Status<std::shared_ptr<BufferProducer>> ProducerQueue::DequeueUnacquiredBuffer( size_t* slot) { if (unavailable_buffers_slot_.size() < 1) { ALOGE( "%s: Failed to dequeue un-acquired buffer. All buffer(s) are in " "acquired state if exist.", __FUNCTION__); return ErrorStatus(ENOMEM); } // Find the first buffer that is not in acquired state from // unavailable_buffers_slot_. for (auto iter = unavailable_buffers_slot_.begin(); iter != unavailable_buffers_slot_.end(); iter++) { std::shared_ptr<BufferProducer> buffer = ProducerQueue::GetBuffer(*iter); if (buffer == nullptr) { ALOGE("%s failed. Buffer slot %d is null.", __FUNCTION__, static_cast<int>(*slot)); return ErrorStatus(EIO); } if (!BufferHubDefs::IsBufferAcquired(buffer->buffer_state())) { *slot = *iter; unavailable_buffers_slot_.erase(iter); unavailable_buffers_slot_.push_back(*slot); ALOGD("%s: Producer queue dequeue unacquired buffer in slot %d", __FUNCTION__, static_cast<int>(*slot)); return {std::move(buffer)}; } } ALOGE( "%s: Failed to dequeue un-acquired buffer. No un-acquired buffer exist.", __FUNCTION__); return ErrorStatus(EBUSY); } pdx::Status<ProducerQueueParcelable> ProducerQueue::TakeAsParcelable() { if (capacity() != 0) { ALOGE( "ProducerQueue::TakeAsParcelable: producer queue can only be taken out" " as a parcelable when empty. Current queue capacity: %zu", capacity()); "%s: producer queue can only be taken out as a parcelable when empty. " "Current queue capacity: %zu", __FUNCTION__, capacity()); return ErrorStatus(EINVAL); } Loading @@ -628,17 +681,16 @@ ConsumerQueue::ConsumerQueue(LocalChannelHandle handle) : BufferHubQueue(std::move(handle)) { auto status = ImportQueue(); if (!status) { ALOGE("ConsumerQueue::ConsumerQueue: Failed to import queue: %s", ALOGE("%s: Failed to import queue: %s", __FUNCTION__, status.GetErrorMessage().c_str()); Close(-status.error()); } auto import_status = ImportBuffers(); if (import_status) { ALOGI("ConsumerQueue::ConsumerQueue: Imported %zu buffers.", import_status.get()); ALOGI("%s: Imported %zu buffers.", __FUNCTION__, import_status.get()); } else { ALOGE("ConsumerQueue::ConsumerQueue: Failed to import buffers: %s", ALOGE("%s: Failed to import buffers: %s", __FUNCTION__, import_status.GetErrorMessage().c_str()); } } Loading @@ -647,13 +699,10 @@ Status<size_t> ConsumerQueue::ImportBuffers() { auto status = InvokeRemoteMethod<BufferHubRPC::ConsumerQueueImportBuffers>(); if (!status) { if (status.error() == EBADR) { ALOGI( "ConsumerQueue::ImportBuffers: Queue is silent, no buffers " "imported."); ALOGI("%s: Queue is silent, no buffers imported.", __FUNCTION__); return {0}; } else { ALOGE( "ConsumerQueue::ImportBuffers: Failed to import consumer buffer: %s", ALOGE("%s: Failed to import consumer buffer: %s", __FUNCTION__, status.GetErrorMessage().c_str()); return status.error_status(); } Loading @@ -665,13 +714,13 @@ Status<size_t> ConsumerQueue::ImportBuffers() { auto buffer_handle_slots = status.take(); for (auto& buffer_handle_slot : buffer_handle_slots) { ALOGD_IF(TRACE, "ConsumerQueue::ImportBuffers: buffer_handle=%d", ALOGD_IF(TRACE, ": buffer_handle=%d", __FUNCTION__, buffer_handle_slot.first.value()); std::unique_ptr<BufferConsumer> buffer_consumer = BufferConsumer::Import(std::move(buffer_handle_slot.first)); if (!buffer_consumer) { ALOGE("ConsumerQueue::ImportBuffers: Failed to import buffer: slot=%zu", ALOGE("%s: Failed to import buffer: slot=%zu", __FUNCTION__, buffer_handle_slot.second); last_error = ErrorStatus(EPIPE); continue; Loading @@ -680,7 +729,7 @@ Status<size_t> ConsumerQueue::ImportBuffers() { auto add_status = AddBuffer(std::move(buffer_consumer), buffer_handle_slot.second); if (!add_status) { ALOGE("ConsumerQueue::ImportBuffers: Failed to add buffer: %s", ALOGE("%s: Failed to add buffer: %s", __FUNCTION__, add_status.GetErrorMessage().c_str()); last_error = add_status; } else { Loading @@ -696,8 +745,8 @@ Status<size_t> ConsumerQueue::ImportBuffers() { Status<void> ConsumerQueue::AddBuffer( const std::shared_ptr<BufferConsumer>& buffer, size_t slot) { ALOGD_IF(TRACE, "ConsumerQueue::AddBuffer: queue_id=%d buffer_id=%d slot=%zu", id(), buffer->id(), slot); ALOGD_IF(TRACE, "%s: queue_id=%d buffer_id=%d slot=%zu", __FUNCTION__, id(), buffer->id(), slot); return BufferHubQueue::AddBuffer(buffer, slot); } Loading @@ -706,9 +755,9 @@ Status<std::shared_ptr<BufferConsumer>> ConsumerQueue::Dequeue( LocalHandle* acquire_fence) { if (user_metadata_size != user_metadata_size_) { ALOGE( "ConsumerQueue::Dequeue: Metadata size (%zu) for the dequeuing buffer " "does not match metadata size (%zu) for the queue.", user_metadata_size, user_metadata_size_); "%s: Metadata size (%zu) for the dequeuing buffer does not match " "metadata size (%zu) for the queue.", __FUNCTION__, user_metadata_size, user_metadata_size_); return ErrorStatus(EINVAL); } Loading @@ -723,7 +772,7 @@ Status<std::shared_ptr<BufferConsumer>> ConsumerQueue::Dequeue( if (metadata_src) { memcpy(meta, metadata_src, user_metadata_size); } else { ALOGW("ConsumerQueue::Dequeue: no user-defined metadata."); ALOGW("%s: no user-defined metadata.", __FUNCTION__); } } Loading @@ -735,7 +784,7 @@ Status<std::shared_ptr<BufferConsumer>> ConsumerQueue::Dequeue( pdx::LocalHandle* acquire_fence) { ATRACE_NAME("ConsumerQueue::Dequeue"); if (slot == nullptr || out_meta == nullptr || acquire_fence == nullptr) { ALOGE("ConsumerQueue::Dequeue: Invalid parameter."); ALOGE("%s: Invalid parameter.", __FUNCTION__); return ErrorStatus(EINVAL); } Loading @@ -752,19 +801,18 @@ Status<std::shared_ptr<BufferConsumer>> ConsumerQueue::Dequeue( } Status<void> ConsumerQueue::OnBufferAllocated() { ALOGD_IF(TRACE, "ConsumerQueue::OnBufferAllocated: queue_id=%d", id()); ALOGD_IF(TRACE, "%s: queue_id=%d", __FUNCTION__, id()); auto status = ImportBuffers(); if (!status) { ALOGE("ConsumerQueue::OnBufferAllocated: Failed to import buffers: %s", ALOGE("%s: Failed to import buffers: %s", __FUNCTION__, status.GetErrorMessage().c_str()); return ErrorStatus(status.error()); } else if (status.get() == 0) { ALOGW("ConsumerQueue::OnBufferAllocated: No new buffers allocated!"); ALOGW("%s: No new buffers allocated!", __FUNCTION__); return ErrorStatus(ENOBUFS); } else { ALOGD_IF(TRACE, "ConsumerQueue::OnBufferAllocated: Imported %zu consumer buffers.", ALOGD_IF(TRACE, "%s: Imported %zu consumer buffers.", __FUNCTION__, status.get()); return {}; } Loading
libs/vr/libbufferhubqueue/include/private/dvr/buffer_hub_queue_client.h +40 −7 Original line number Diff line number Diff line Loading @@ -57,10 +57,10 @@ class BufferHubQueue : public pdx::Client { uint32_t default_width() const { return default_width_; } // Returns the default buffer height of this buffer queue. uint32_t default_height() const { return static_cast<uint32_t>(default_height_); } uint32_t default_height() const { return default_height_; } // Returns the default buffer format of this buffer queue. uint32_t default_format() const { return static_cast<uint32_t>(default_format_); } uint32_t default_format() const { return default_format_; } // Creates a new consumer in handle form for immediate transport over RPC. pdx::Status<pdx::LocalChannelHandle> CreateConsumerQueueHandle( Loading Loading @@ -208,6 +208,14 @@ class BufferHubQueue : public pdx::Client { // Size of the metadata that buffers in this queue cary. size_t user_metadata_size_{0}; // Buffers and related data that are available for dequeue. std::priority_queue<Entry, std::vector<Entry>, EntryComparator> available_buffers_; // Slot of the buffers that are not available for normal dequeue. For example, // the slot of posted or acquired buffers in the perspective of a producer. std::vector<size_t> unavailable_buffers_slot_; private: void Initialize(); Loading Loading @@ -252,10 +260,6 @@ class BufferHubQueue : public pdx::Client { // queue regardless of its queue position or presence in the ring buffer. std::array<std::shared_ptr<BufferHubBase>, kMaxQueueCapacity> buffers_; // Buffers and related data that are available for dequeue. std::priority_queue<Entry, std::vector<Entry>, EntryComparator> available_buffers_; // Keeps track with how many buffers have been added into the queue. size_t capacity_{0}; Loading Loading @@ -349,11 +353,30 @@ class ProducerQueue : public pdx::ClientBase<ProducerQueue, BufferHubQueue> { // Dequeue a producer buffer to write. The returned buffer in |Gain|'ed mode, // and caller should call Post() once it's done writing to release the buffer // to the consumer side. // @return a buffer in gained state, which was originally in released state. pdx::Status<std::shared_ptr<BufferProducer>> Dequeue( int timeout, size_t* slot, pdx::LocalHandle* release_fence); // Dequeue a producer buffer to write. The returned buffer in |Gain|'ed mode, // and caller should call Post() once it's done writing to release the buffer // to the consumer side. // // @param timeout to dequeue a buffer. // @param slot is the slot of the output BufferProducer. // @param release_fence for gaining a buffer. // @param out_meta metadata of the output buffer. // @param gain_posted_buffer whether to gain posted buffer if no released // buffer is available to gain. // @return a buffer in gained state, which was originally in released state if // gain_posted_buffer is false, or in posted/released state if // gain_posted_buffer is true. // TODO(b/112007999): gain_posted_buffer true is only used to prevent // libdvrtracking from starving when there are non-responding clients. This // gain_posted_buffer param can be removed once libdvrtracking start to use // the new AHardwareBuffer API. pdx::Status<std::shared_ptr<BufferProducer>> Dequeue( int timeout, size_t* slot, DvrNativeBufferMetadata* out_meta, pdx::LocalHandle* release_fence); pdx::LocalHandle* release_fence, bool gain_posted_buffer = false); // Enqueues a producer buffer in the queue. pdx::Status<void> Enqueue(const std::shared_ptr<BufferProducer>& buffer, Loading @@ -374,6 +397,16 @@ class ProducerQueue : public pdx::ClientBase<ProducerQueue, BufferHubQueue> { // arguments as the constructors. explicit ProducerQueue(pdx::LocalChannelHandle handle); ProducerQueue(const ProducerQueueConfig& config, const UsagePolicy& usage); // Dequeue a producer buffer to write. The returned buffer in |Gain|'ed mode, // and caller should call Post() once it's done writing to release the buffer // to the consumer side. // // @param slot the slot of the returned buffer. // @return a buffer in gained state, which was originally in posted state or // released state. pdx::Status<std::shared_ptr<BufferProducer>> DequeueUnacquiredBuffer( size_t* slot); }; class ConsumerQueue : public BufferHubQueue { Loading
libs/vr/libbufferhubqueue/tests/buffer_hub_queue-test.cpp +144 −2 Original line number Diff line number Diff line #include <base/logging.h> #include <binder/Parcel.h> #include <dvr/dvr_api.h> #include <private/dvr/buffer_hub_client.h> #include <private/dvr/buffer_hub_queue_client.h> Loading Loading @@ -122,6 +123,147 @@ TEST_F(BufferHubQueueTest, TestDequeue) { } } TEST_F(BufferHubQueueTest, TestDequeuePostedBufferIfNoAvailableReleasedBuffer_withBufferConsumer) { ASSERT_TRUE(CreateQueues(config_builder_.Build(), UsagePolicy{})); // Allocate 3 buffers to use. const size_t test_queue_capacity = 3; for (int64_t i = 0; i < test_queue_capacity; i++) { AllocateBuffer(); } EXPECT_EQ(producer_queue_->capacity(), test_queue_capacity); size_t producer_slot, consumer_slot; LocalHandle fence; DvrNativeBufferMetadata mi, mo; // Producer posts 2 buffers and remember their posted sequence. std::deque<size_t> posted_slots; for (int64_t i = 0; i < 2; i++) { auto p1_status = producer_queue_->Dequeue(kTimeoutMs, &producer_slot, &mo, &fence, true); EXPECT_TRUE(p1_status.ok()); auto p1 = p1_status.take(); ASSERT_NE(p1, nullptr); // Producer should not be gaining posted buffer when there are still // available buffers to gain. auto found_iter = std::find(posted_slots.begin(), posted_slots.end(), producer_slot); EXPECT_EQ(found_iter, posted_slots.end()); posted_slots.push_back(producer_slot); // Producer posts the buffer. mi.index = i; EXPECT_EQ(0, p1->PostAsync(&mi, LocalHandle())); } // Consumer acquires one buffer. auto c1_status = consumer_queue_->Dequeue(kTimeoutMs, &consumer_slot, &mo, &fence); EXPECT_TRUE(c1_status.ok()); auto c1 = c1_status.take(); ASSERT_NE(c1, nullptr); // Consumer should get the oldest posted buffer. No checks here. // posted_slots[0] should be in acquired state now. EXPECT_EQ(mo.index, 0); // Consumer releases the buffer. EXPECT_EQ(c1->ReleaseAsync(&mi, LocalHandle()), 0); // posted_slots[0] should be in released state now. // Producer gain and post 2 buffers. for (int64_t i = 0; i < 2; i++) { auto p1_status = producer_queue_->Dequeue(kTimeoutMs, &producer_slot, &mo, &fence, true); EXPECT_TRUE(p1_status.ok()); auto p1 = p1_status.take(); ASSERT_NE(p1, nullptr); // The gained buffer should be the one in released state or the one haven't // been use. EXPECT_NE(posted_slots[1], producer_slot); mi.index = i + 2; EXPECT_EQ(0, p1->PostAsync(&mi, LocalHandle())); } // Producer gains a buffer. auto p1_status = producer_queue_->Dequeue(kTimeoutMs, &producer_slot, &mo, &fence, true); EXPECT_TRUE(p1_status.ok()); auto p1 = p1_status.take(); ASSERT_NE(p1, nullptr); // The gained buffer should be the oldest posted buffer. EXPECT_EQ(posted_slots[1], producer_slot); // Producer posts the buffer. mi.index = 4; EXPECT_EQ(0, p1->PostAsync(&mi, LocalHandle())); } TEST_F(BufferHubQueueTest, TestDequeuePostedBufferIfNoAvailableReleasedBuffer_noBufferConsumer) { ASSERT_TRUE(CreateQueues(config_builder_.Build(), UsagePolicy{})); // Allocate 4 buffers to use. const size_t test_queue_capacity = 4; for (int64_t i = 0; i < test_queue_capacity; i++) { AllocateBuffer(); } EXPECT_EQ(producer_queue_->capacity(), test_queue_capacity); // Post all allowed buffers and remember their posted sequence. std::deque<size_t> posted_slots; for (int64_t i = 0; i < test_queue_capacity; i++) { size_t slot; LocalHandle fence; DvrNativeBufferMetadata mi, mo; // Producer gains a buffer. auto p1_status = producer_queue_->Dequeue(kTimeoutMs, &slot, &mo, &fence, true); EXPECT_TRUE(p1_status.ok()); auto p1 = p1_status.take(); ASSERT_NE(p1, nullptr); // Producer should not be gaining posted buffer when there are still // available buffers to gain. auto found_iter = std::find(posted_slots.begin(), posted_slots.end(), slot); EXPECT_EQ(found_iter, posted_slots.end()); posted_slots.push_back(slot); // Producer posts the buffer. mi.index = i; EXPECT_EQ(p1->PostAsync(&mi, LocalHandle()), 0); } // Gain posted buffers in sequence. const int64_t nb_dequeue_all_times = 2; for (int j = 0; j < nb_dequeue_all_times; ++j) { for (int i = 0; i < test_queue_capacity; ++i) { size_t slot; LocalHandle fence; DvrNativeBufferMetadata mi, mo; // Producer gains a buffer. auto p1_status = producer_queue_->Dequeue(kTimeoutMs, &slot, &mo, &fence, true); EXPECT_TRUE(p1_status.ok()); auto p1 = p1_status.take(); ASSERT_NE(p1, nullptr); // The gained buffer should be the oldest posted buffer. EXPECT_EQ(posted_slots[i], slot); // Producer posts the buffer. mi.index = i + test_queue_capacity * (j + 1); EXPECT_EQ(p1->PostAsync(&mi, LocalHandle()), 0); } } } TEST_F(BufferHubQueueTest, TestProducerConsumer) { const size_t kBufferCount = 16; size_t slot; Loading Loading @@ -245,8 +387,8 @@ TEST_F(BufferHubQueueTest, TestRemoveBuffer) { for (size_t i = 0; i < kBufferCount; i++) { Entry* entry = &buffers[i]; auto producer_status = producer_queue_->Dequeue( kTimeoutMs, &entry->slot, &mo, &entry->fence); auto producer_status = producer_queue_->Dequeue(kTimeoutMs, &entry->slot, &mo, &entry->fence); ASSERT_TRUE(producer_status.ok()); entry->buffer = producer_status.take(); ASSERT_NE(nullptr, entry->buffer); Loading