Loading services/vr/bufferhubd/consumer_queue_channel.cpp +31 −23 Original line number Diff line number Diff line Loading @@ -80,27 +80,35 @@ BufferHubChannel::BufferInfo ConsumerQueueChannel::GetBufferInfo() const { } void ConsumerQueueChannel::RegisterNewBuffer( const std::shared_ptr<ProducerChannel>& producer_channel, size_t slot) { ALOGD_IF(TRACE, "ConsumerQueueChannel::RegisterNewBuffer: queue_id=%d buffer_id=%d " "slot=%zu silent=%d", buffer_id(), producer_channel->buffer_id(), slot, silent_); const std::shared_ptr<ProducerChannel>& producer_channel, size_t producer_slot) { ALOGD_IF(TRACE, "%s: queue_id=%d buffer_id=%d slot=%zu silent=%d", __FUNCTION__, buffer_id(), producer_channel->buffer_id(), producer_slot, silent_); // Only register buffers if the queue is not silent. if (!silent_) { pending_buffer_slots_.emplace(producer_channel, slot); if (silent_) { return; } auto status = producer_channel->CreateConsumerStateMask(); if (!status.ok()) { ALOGE("%s: Failed to create consumer state mask: %s", __FUNCTION__, status.GetErrorMessage().c_str()); return; } uint64_t consumer_state_mask = status.get(); pending_buffer_slots_.emplace(producer_channel, producer_slot, consumer_state_mask); // Signal the client that there is new buffer available. SignalAvailable(); } } Status<std::vector<std::pair<RemoteChannelHandle, size_t>>> ConsumerQueueChannel::OnConsumerQueueImportBuffers(Message& message) { std::vector<std::pair<RemoteChannelHandle, size_t>> buffer_handles; ATRACE_NAME("ConsumerQueueChannel::OnConsumerQueueImportBuffers"); ALOGD_IF(TRACE, "ConsumerQueueChannel::OnConsumerQueueImportBuffers: " "pending_buffer_slots=%zu", ATRACE_NAME(__FUNCTION__); ALOGD_IF(TRACE, "%s: pending_buffer_slots=%zu", __FUNCTION__, pending_buffer_slots_.size()); // Indicate this is a silent queue that will not import buffers. Loading @@ -108,29 +116,29 @@ ConsumerQueueChannel::OnConsumerQueueImportBuffers(Message& message) { return ErrorStatus(EBADR); while (!pending_buffer_slots_.empty()) { auto producer_channel = pending_buffer_slots_.front().first.lock(); size_t producer_slot = pending_buffer_slots_.front().second; auto producer_channel = pending_buffer_slots_.front().producer_channel.lock(); size_t producer_slot = pending_buffer_slots_.front().producer_slot; uint64_t consumer_state_mask = pending_buffer_slots_.front().consumer_state_mask; pending_buffer_slots_.pop(); // It's possible that the producer channel has expired. When this occurs, // ignore the producer channel. if (producer_channel == nullptr) { ALOGW( "ConsumerQueueChannel::OnConsumerQueueImportBuffers: producer " "channel has already been expired."); ALOGW("%s: producer channel has already been expired.", __FUNCTION__); continue; } auto status = producer_channel->CreateConsumer(message); auto status = producer_channel->CreateConsumer(message, consumer_state_mask); // If no buffers are imported successfully, clear available and return an // error. Otherwise, return all consumer handles already imported // successfully, but keep available bits on, so that the client can retry // importing remaining consumer buffers. if (!status) { ALOGE( "ConsumerQueueChannel::OnConsumerQueueImportBuffers: Failed create " "consumer: %s", ALOGE("%s: Failed create consumer: %s", __FUNCTION__, status.GetErrorMessage().c_str()); if (buffer_handles.empty()) { ClearAvailable(); Loading services/vr/bufferhubd/include/private/dvr/consumer_queue_channel.h +20 −4 Original line number Diff line number Diff line Loading @@ -3,8 +3,8 @@ #include <queue> #include <private/dvr/bufferhub_rpc.h> #include <private/dvr/buffer_hub.h> #include <private/dvr/bufferhub_rpc.h> #include <private/dvr/consumer_channel.h> #include <private/dvr/producer_queue_channel.h> Loading @@ -28,7 +28,8 @@ class ConsumerQueueChannel : public BufferHubChannel { // Called by ProdcuerQueueChannel to notify consumer queue that a new // buffer has been allocated. void RegisterNewBuffer( const std::shared_ptr<ProducerChannel>& producer_channel, size_t slot); const std::shared_ptr<ProducerChannel>& producer_channel, size_t producer_slot); // Called after clients been signaled by service that new buffer has been // allocated. Clients uses kOpConsumerQueueImportBuffers to import new Loading @@ -40,14 +41,29 @@ class ConsumerQueueChannel : public BufferHubChannel { void OnProducerClosed(); private: // Data structure to store relavant info of a newly allocated producer buffer // so that consumer channel and buffer can be created later. struct PendingBuffer { PendingBuffer(std::shared_ptr<ProducerChannel> channel, size_t slot, uint64_t mask) { producer_channel = channel; producer_slot = slot; consumer_state_mask = mask; } PendingBuffer() = delete; std::weak_ptr<ProducerChannel> producer_channel; size_t producer_slot; uint64_t consumer_state_mask; }; std::shared_ptr<ProducerQueueChannel> GetProducer() const; // Pointer to the producer channel. std::weak_ptr<Channel> producer_; // Tracks newly allocated buffer producers along with it's slot number. std::queue<std::pair<std::weak_ptr<ProducerChannel>, size_t>> pending_buffer_slots_; std::queue<PendingBuffer> pending_buffer_slots_; // Tracks how many buffers have this queue imported. size_t capacity_; Loading services/vr/bufferhubd/include/private/dvr/producer_channel.h +8 −2 Original line number Diff line number Diff line Loading @@ -53,7 +53,9 @@ class ProducerChannel : public BufferHubChannel { BufferDescription<BorrowedHandle> GetBuffer(uint64_t client_state_mask); pdx::Status<RemoteChannelHandle> CreateConsumer(Message& message); pdx::Status<RemoteChannelHandle> CreateConsumer(Message& message, uint64_t consumer_state_mask); pdx::Status<uint64_t> CreateConsumerStateMask(); pdx::Status<RemoteChannelHandle> OnNewConsumer(Message& message); pdx::Status<LocalFence> OnConsumerAcquire(Message& message); Loading Loading @@ -111,6 +113,10 @@ class ProducerChannel : public BufferHubChannel { pdx::Status<void> OnProducerPost(Message& message, LocalFence acquire_fence); pdx::Status<LocalFence> OnProducerGain(Message& message); // Remove consumer from atomics in shared memory based on consumer_state_mask. // This function is used for clean up for failures in CreateConsumer method. void RemoveConsumerClientMask(uint64_t consumer_state_mask); ProducerChannel(const ProducerChannel&) = delete; void operator=(const ProducerChannel&) = delete; }; Loading services/vr/bufferhubd/producer_channel.cpp +61 −29 Original line number Diff line number Diff line Loading @@ -248,21 +248,7 @@ Status<BufferDescription<BorrowedHandle>> ProducerChannel::OnGetBuffer( return {GetBuffer(BufferHubDefs::kFirstClientBitMask)}; } Status<RemoteChannelHandle> ProducerChannel::CreateConsumer(Message& message) { ATRACE_NAME("ProducerChannel::CreateConsumer"); ALOGD_IF(TRACE, "ProducerChannel::CreateConsumer: buffer_id=%d, producer_owns=%d", buffer_id(), producer_owns_); int channel_id; auto status = message.PushChannel(0, nullptr, &channel_id); if (!status) { ALOGE( "ProducerChannel::CreateConsumer: Failed to push consumer channel: %s", status.GetErrorMessage().c_str()); return ErrorStatus(ENOMEM); } Status<uint64_t> ProducerChannel::CreateConsumerStateMask() { // Try find the next consumer state bit which has not been claimed by any // consumer yet. // memory_order_acquire is chosen here because all writes in other threads Loading @@ -277,7 +263,6 @@ Status<RemoteChannelHandle> ProducerChannel::CreateConsumer(Message& message) { "consumers per producer: 63."); return ErrorStatus(E2BIG); } uint64_t updated_active_clients_bit_mask = current_active_clients_bit_mask | client_state_mask; // Set the updated value only if the current value stays the same as what was Loading @@ -286,28 +271,71 @@ Status<RemoteChannelHandle> ProducerChannel::CreateConsumer(Message& message) { // thread, and the modification will be visible in other threads that acquire // active_clients_bit_mask_. If the comparison fails, load the result of // all writes from all threads to updated_active_clients_bit_mask. if (!active_clients_bit_mask_->compare_exchange_weak( // Keep on finding the next available slient state mask until succeed or out // of memory. while (!active_clients_bit_mask_->compare_exchange_weak( current_active_clients_bit_mask, updated_active_clients_bit_mask, std::memory_order_acq_rel, std::memory_order_acquire)) { ALOGE("Current active clients bit mask is changed to %" PRIx64 ", which was expected to be %" PRIx64 ".", ", which was expected to be %" PRIx64 ". Trying to generate a new client state mask to resolve race " "condition.", updated_active_clients_bit_mask, current_active_clients_bit_mask); return ErrorStatus(EBUSY); client_state_mask = BufferHubDefs::FindNextAvailableClientStateMask( current_active_clients_bit_mask | orphaned_consumer_bit_mask_); if (client_state_mask == 0ULL) { ALOGE( "ProducerChannel::CreateConsumer: reached the maximum mumber of " "consumers per producer: 63."); return ErrorStatus(E2BIG); } updated_active_clients_bit_mask = current_active_clients_bit_mask | client_state_mask; } return {client_state_mask}; } void ProducerChannel::RemoveConsumerClientMask(uint64_t consumer_state_mask) { // Clear up the buffer state and fence state in case there is already // something there due to possible race condition between producer post and // consumer failed to create channel. buffer_state_->fetch_and(~consumer_state_mask, std::memory_order_release); fence_state_->fetch_and(~consumer_state_mask, std::memory_order_release); // Restore the consumer state bit and make it visible in other threads that // acquire the active_clients_bit_mask_. active_clients_bit_mask_->fetch_and(~consumer_state_mask, std::memory_order_release); } auto consumer = std::make_shared<ConsumerChannel>(service(), buffer_id(), channel_id, client_state_mask, shared_from_this()); Status<RemoteChannelHandle> ProducerChannel::CreateConsumer( Message& message, uint64_t consumer_state_mask) { ATRACE_NAME("ProducerChannel::CreateConsumer"); ALOGD_IF(TRACE, "ProducerChannel::CreateConsumer: buffer_id=%d, producer_owns=%d", buffer_id(), producer_owns_); int channel_id; auto status = message.PushChannel(0, nullptr, &channel_id); if (!status) { ALOGE( "ProducerChannel::CreateConsumer: Failed to push consumer channel: %s", status.GetErrorMessage().c_str()); RemoveConsumerClientMask(consumer_state_mask); return ErrorStatus(ENOMEM); } auto consumer = std::make_shared<ConsumerChannel>( service(), buffer_id(), channel_id, consumer_state_mask, shared_from_this()); const auto channel_status = service()->SetChannel(channel_id, consumer); if (!channel_status) { ALOGE( "ProducerChannel::CreateConsumer: failed to set new consumer channel: " "%s", channel_status.GetErrorMessage().c_str()); // Restore the consumer state bit and make it visible in other threads that // acquire the active_clients_bit_mask_. active_clients_bit_mask_->fetch_and(~client_state_mask, std::memory_order_release); RemoveConsumerClientMask(consumer_state_mask); return ErrorStatus(ENOMEM); } Loading @@ -327,7 +355,11 @@ Status<RemoteChannelHandle> ProducerChannel::CreateConsumer(Message& message) { Status<RemoteChannelHandle> ProducerChannel::OnNewConsumer(Message& message) { ATRACE_NAME("ProducerChannel::OnNewConsumer"); ALOGD_IF(TRACE, "ProducerChannel::OnNewConsumer: buffer_id=%d", buffer_id()); return CreateConsumer(message); auto status = CreateConsumerStateMask(); if (!status.ok()) { return status.error_status(); } return CreateConsumer(message, /*consumer_state_mask=*/status.get()); } Status<void> ProducerChannel::OnProducerPost(Message&, Loading Loading
services/vr/bufferhubd/consumer_queue_channel.cpp +31 −23 Original line number Diff line number Diff line Loading @@ -80,27 +80,35 @@ BufferHubChannel::BufferInfo ConsumerQueueChannel::GetBufferInfo() const { } void ConsumerQueueChannel::RegisterNewBuffer( const std::shared_ptr<ProducerChannel>& producer_channel, size_t slot) { ALOGD_IF(TRACE, "ConsumerQueueChannel::RegisterNewBuffer: queue_id=%d buffer_id=%d " "slot=%zu silent=%d", buffer_id(), producer_channel->buffer_id(), slot, silent_); const std::shared_ptr<ProducerChannel>& producer_channel, size_t producer_slot) { ALOGD_IF(TRACE, "%s: queue_id=%d buffer_id=%d slot=%zu silent=%d", __FUNCTION__, buffer_id(), producer_channel->buffer_id(), producer_slot, silent_); // Only register buffers if the queue is not silent. if (!silent_) { pending_buffer_slots_.emplace(producer_channel, slot); if (silent_) { return; } auto status = producer_channel->CreateConsumerStateMask(); if (!status.ok()) { ALOGE("%s: Failed to create consumer state mask: %s", __FUNCTION__, status.GetErrorMessage().c_str()); return; } uint64_t consumer_state_mask = status.get(); pending_buffer_slots_.emplace(producer_channel, producer_slot, consumer_state_mask); // Signal the client that there is new buffer available. SignalAvailable(); } } Status<std::vector<std::pair<RemoteChannelHandle, size_t>>> ConsumerQueueChannel::OnConsumerQueueImportBuffers(Message& message) { std::vector<std::pair<RemoteChannelHandle, size_t>> buffer_handles; ATRACE_NAME("ConsumerQueueChannel::OnConsumerQueueImportBuffers"); ALOGD_IF(TRACE, "ConsumerQueueChannel::OnConsumerQueueImportBuffers: " "pending_buffer_slots=%zu", ATRACE_NAME(__FUNCTION__); ALOGD_IF(TRACE, "%s: pending_buffer_slots=%zu", __FUNCTION__, pending_buffer_slots_.size()); // Indicate this is a silent queue that will not import buffers. Loading @@ -108,29 +116,29 @@ ConsumerQueueChannel::OnConsumerQueueImportBuffers(Message& message) { return ErrorStatus(EBADR); while (!pending_buffer_slots_.empty()) { auto producer_channel = pending_buffer_slots_.front().first.lock(); size_t producer_slot = pending_buffer_slots_.front().second; auto producer_channel = pending_buffer_slots_.front().producer_channel.lock(); size_t producer_slot = pending_buffer_slots_.front().producer_slot; uint64_t consumer_state_mask = pending_buffer_slots_.front().consumer_state_mask; pending_buffer_slots_.pop(); // It's possible that the producer channel has expired. When this occurs, // ignore the producer channel. if (producer_channel == nullptr) { ALOGW( "ConsumerQueueChannel::OnConsumerQueueImportBuffers: producer " "channel has already been expired."); ALOGW("%s: producer channel has already been expired.", __FUNCTION__); continue; } auto status = producer_channel->CreateConsumer(message); auto status = producer_channel->CreateConsumer(message, consumer_state_mask); // If no buffers are imported successfully, clear available and return an // error. Otherwise, return all consumer handles already imported // successfully, but keep available bits on, so that the client can retry // importing remaining consumer buffers. if (!status) { ALOGE( "ConsumerQueueChannel::OnConsumerQueueImportBuffers: Failed create " "consumer: %s", ALOGE("%s: Failed create consumer: %s", __FUNCTION__, status.GetErrorMessage().c_str()); if (buffer_handles.empty()) { ClearAvailable(); Loading
services/vr/bufferhubd/include/private/dvr/consumer_queue_channel.h +20 −4 Original line number Diff line number Diff line Loading @@ -3,8 +3,8 @@ #include <queue> #include <private/dvr/bufferhub_rpc.h> #include <private/dvr/buffer_hub.h> #include <private/dvr/bufferhub_rpc.h> #include <private/dvr/consumer_channel.h> #include <private/dvr/producer_queue_channel.h> Loading @@ -28,7 +28,8 @@ class ConsumerQueueChannel : public BufferHubChannel { // Called by ProdcuerQueueChannel to notify consumer queue that a new // buffer has been allocated. void RegisterNewBuffer( const std::shared_ptr<ProducerChannel>& producer_channel, size_t slot); const std::shared_ptr<ProducerChannel>& producer_channel, size_t producer_slot); // Called after clients been signaled by service that new buffer has been // allocated. Clients uses kOpConsumerQueueImportBuffers to import new Loading @@ -40,14 +41,29 @@ class ConsumerQueueChannel : public BufferHubChannel { void OnProducerClosed(); private: // Data structure to store relavant info of a newly allocated producer buffer // so that consumer channel and buffer can be created later. struct PendingBuffer { PendingBuffer(std::shared_ptr<ProducerChannel> channel, size_t slot, uint64_t mask) { producer_channel = channel; producer_slot = slot; consumer_state_mask = mask; } PendingBuffer() = delete; std::weak_ptr<ProducerChannel> producer_channel; size_t producer_slot; uint64_t consumer_state_mask; }; std::shared_ptr<ProducerQueueChannel> GetProducer() const; // Pointer to the producer channel. std::weak_ptr<Channel> producer_; // Tracks newly allocated buffer producers along with it's slot number. std::queue<std::pair<std::weak_ptr<ProducerChannel>, size_t>> pending_buffer_slots_; std::queue<PendingBuffer> pending_buffer_slots_; // Tracks how many buffers have this queue imported. size_t capacity_; Loading
services/vr/bufferhubd/include/private/dvr/producer_channel.h +8 −2 Original line number Diff line number Diff line Loading @@ -53,7 +53,9 @@ class ProducerChannel : public BufferHubChannel { BufferDescription<BorrowedHandle> GetBuffer(uint64_t client_state_mask); pdx::Status<RemoteChannelHandle> CreateConsumer(Message& message); pdx::Status<RemoteChannelHandle> CreateConsumer(Message& message, uint64_t consumer_state_mask); pdx::Status<uint64_t> CreateConsumerStateMask(); pdx::Status<RemoteChannelHandle> OnNewConsumer(Message& message); pdx::Status<LocalFence> OnConsumerAcquire(Message& message); Loading Loading @@ -111,6 +113,10 @@ class ProducerChannel : public BufferHubChannel { pdx::Status<void> OnProducerPost(Message& message, LocalFence acquire_fence); pdx::Status<LocalFence> OnProducerGain(Message& message); // Remove consumer from atomics in shared memory based on consumer_state_mask. // This function is used for clean up for failures in CreateConsumer method. void RemoveConsumerClientMask(uint64_t consumer_state_mask); ProducerChannel(const ProducerChannel&) = delete; void operator=(const ProducerChannel&) = delete; }; Loading
services/vr/bufferhubd/producer_channel.cpp +61 −29 Original line number Diff line number Diff line Loading @@ -248,21 +248,7 @@ Status<BufferDescription<BorrowedHandle>> ProducerChannel::OnGetBuffer( return {GetBuffer(BufferHubDefs::kFirstClientBitMask)}; } Status<RemoteChannelHandle> ProducerChannel::CreateConsumer(Message& message) { ATRACE_NAME("ProducerChannel::CreateConsumer"); ALOGD_IF(TRACE, "ProducerChannel::CreateConsumer: buffer_id=%d, producer_owns=%d", buffer_id(), producer_owns_); int channel_id; auto status = message.PushChannel(0, nullptr, &channel_id); if (!status) { ALOGE( "ProducerChannel::CreateConsumer: Failed to push consumer channel: %s", status.GetErrorMessage().c_str()); return ErrorStatus(ENOMEM); } Status<uint64_t> ProducerChannel::CreateConsumerStateMask() { // Try find the next consumer state bit which has not been claimed by any // consumer yet. // memory_order_acquire is chosen here because all writes in other threads Loading @@ -277,7 +263,6 @@ Status<RemoteChannelHandle> ProducerChannel::CreateConsumer(Message& message) { "consumers per producer: 63."); return ErrorStatus(E2BIG); } uint64_t updated_active_clients_bit_mask = current_active_clients_bit_mask | client_state_mask; // Set the updated value only if the current value stays the same as what was Loading @@ -286,28 +271,71 @@ Status<RemoteChannelHandle> ProducerChannel::CreateConsumer(Message& message) { // thread, and the modification will be visible in other threads that acquire // active_clients_bit_mask_. If the comparison fails, load the result of // all writes from all threads to updated_active_clients_bit_mask. if (!active_clients_bit_mask_->compare_exchange_weak( // Keep on finding the next available slient state mask until succeed or out // of memory. while (!active_clients_bit_mask_->compare_exchange_weak( current_active_clients_bit_mask, updated_active_clients_bit_mask, std::memory_order_acq_rel, std::memory_order_acquire)) { ALOGE("Current active clients bit mask is changed to %" PRIx64 ", which was expected to be %" PRIx64 ".", ", which was expected to be %" PRIx64 ". Trying to generate a new client state mask to resolve race " "condition.", updated_active_clients_bit_mask, current_active_clients_bit_mask); return ErrorStatus(EBUSY); client_state_mask = BufferHubDefs::FindNextAvailableClientStateMask( current_active_clients_bit_mask | orphaned_consumer_bit_mask_); if (client_state_mask == 0ULL) { ALOGE( "ProducerChannel::CreateConsumer: reached the maximum mumber of " "consumers per producer: 63."); return ErrorStatus(E2BIG); } updated_active_clients_bit_mask = current_active_clients_bit_mask | client_state_mask; } return {client_state_mask}; } void ProducerChannel::RemoveConsumerClientMask(uint64_t consumer_state_mask) { // Clear up the buffer state and fence state in case there is already // something there due to possible race condition between producer post and // consumer failed to create channel. buffer_state_->fetch_and(~consumer_state_mask, std::memory_order_release); fence_state_->fetch_and(~consumer_state_mask, std::memory_order_release); // Restore the consumer state bit and make it visible in other threads that // acquire the active_clients_bit_mask_. active_clients_bit_mask_->fetch_and(~consumer_state_mask, std::memory_order_release); } auto consumer = std::make_shared<ConsumerChannel>(service(), buffer_id(), channel_id, client_state_mask, shared_from_this()); Status<RemoteChannelHandle> ProducerChannel::CreateConsumer( Message& message, uint64_t consumer_state_mask) { ATRACE_NAME("ProducerChannel::CreateConsumer"); ALOGD_IF(TRACE, "ProducerChannel::CreateConsumer: buffer_id=%d, producer_owns=%d", buffer_id(), producer_owns_); int channel_id; auto status = message.PushChannel(0, nullptr, &channel_id); if (!status) { ALOGE( "ProducerChannel::CreateConsumer: Failed to push consumer channel: %s", status.GetErrorMessage().c_str()); RemoveConsumerClientMask(consumer_state_mask); return ErrorStatus(ENOMEM); } auto consumer = std::make_shared<ConsumerChannel>( service(), buffer_id(), channel_id, consumer_state_mask, shared_from_this()); const auto channel_status = service()->SetChannel(channel_id, consumer); if (!channel_status) { ALOGE( "ProducerChannel::CreateConsumer: failed to set new consumer channel: " "%s", channel_status.GetErrorMessage().c_str()); // Restore the consumer state bit and make it visible in other threads that // acquire the active_clients_bit_mask_. active_clients_bit_mask_->fetch_and(~client_state_mask, std::memory_order_release); RemoveConsumerClientMask(consumer_state_mask); return ErrorStatus(ENOMEM); } Loading @@ -327,7 +355,11 @@ Status<RemoteChannelHandle> ProducerChannel::CreateConsumer(Message& message) { Status<RemoteChannelHandle> ProducerChannel::OnNewConsumer(Message& message) { ATRACE_NAME("ProducerChannel::OnNewConsumer"); ALOGD_IF(TRACE, "ProducerChannel::OnNewConsumer: buffer_id=%d", buffer_id()); return CreateConsumer(message); auto status = CreateConsumerStateMask(); if (!status.ok()) { return status.error_status(); } return CreateConsumer(message, /*consumer_state_mask=*/status.get()); } Status<void> ProducerChannel::OnProducerPost(Message&, Loading