Loading system/gd/os/linux_generic/queue_unittest.cc +12 −0 Original line number Original line Diff line number Diff line Loading @@ -892,6 +892,18 @@ TEST_F(EnqueueBufferTest, clear) { ASSERT_FALSE(enqueue_.registered_); ASSERT_FALSE(enqueue_.registered_); } } TEST_F(EnqueueBufferTest, delete_when_in_callback) { Queue<int>* queue = new Queue<int>(kQueueSize); EnqueueBuffer<int>* enqueue_buffer = new EnqueueBuffer<int>(queue); int num_items = 10; for (int i = 0; i < num_items; i++) { enqueue_buffer->Enqueue(std::make_unique<int>(i), handler_); } delete enqueue_buffer; delete queue; } } // namespace } // namespace } // namespace os } // namespace os } // namespace bluetooth } // namespace bluetooth system/gd/os/linux_generic/reactor.cc +2 −0 Original line number Original line Diff line number Diff line Loading @@ -193,10 +193,12 @@ void Reactor::Unregister(Reactor::Reactable* reactable) { } } bool Reactor::WaitForUnregisteredReactable(std::chrono::milliseconds timeout) { bool Reactor::WaitForUnregisteredReactable(std::chrono::milliseconds timeout) { std::lock_guard<std::mutex> lock(mutex_); if (executing_reactable_finished_ == nullptr) { if (executing_reactable_finished_ == nullptr) { return true; return true; } } auto stop_status = executing_reactable_finished_->wait_for(timeout); auto stop_status = executing_reactable_finished_->wait_for(timeout); executing_reactable_finished_ = nullptr; if (stop_status != std::future_status::ready) { if (stop_status != std::future_status::ready) { LOG_ERROR("Unregister reactable timed out"); LOG_ERROR("Unregister reactable timed out"); } } Loading system/gd/os/queue.h +10 −3 Original line number Original line Diff line number Diff line Loading @@ -106,17 +106,23 @@ class EnqueueBuffer { public: public: EnqueueBuffer(IQueueEnqueue<T>* queue) : queue_(queue) {} EnqueueBuffer(IQueueEnqueue<T>* queue) : queue_(queue) {} ~EnqueueBuffer() { if (enqueue_registered_.exchange(false)) { queue_->UnregisterEnqueue(); } } void Enqueue(std::unique_ptr<T> t, os::Handler* handler) { void Enqueue(std::unique_ptr<T> t, os::Handler* handler) { std::lock_guard<std::mutex> lock(mutex_); std::lock_guard<std::mutex> lock(mutex_); buffer_.push(std::move(t)); buffer_.push(std::move(t)); if (buffer_.size() == 1) { if (!enqueue_registered_.exchange(true)) { queue_->RegisterEnqueue(handler, common::Bind(&EnqueueBuffer<T>::enqueue_callback, common::Unretained(this))); queue_->RegisterEnqueue(handler, common::Bind(&EnqueueBuffer<T>::enqueue_callback, common::Unretained(this))); } } } } void Clear() { void Clear() { std::lock_guard<std::mutex> lock(mutex_); std::lock_guard<std::mutex> lock(mutex_); if (!buffer_.empty()) { if (enqueue_registered_.exchange(false)) { queue_->UnregisterEnqueue(); queue_->UnregisterEnqueue(); std::queue<std::unique_ptr<T>> empty; std::queue<std::unique_ptr<T>> empty; std::swap(buffer_, empty); std::swap(buffer_, empty); Loading @@ -128,7 +134,7 @@ class EnqueueBuffer { std::lock_guard<std::mutex> lock(mutex_); std::lock_guard<std::mutex> lock(mutex_); std::unique_ptr<T> enqueued_t = std::move(buffer_.front()); std::unique_ptr<T> enqueued_t = std::move(buffer_.front()); buffer_.pop(); buffer_.pop(); if (buffer_.empty()) { if (buffer_.empty() && enqueue_registered_.exchange(false)) { queue_->UnregisterEnqueue(); queue_->UnregisterEnqueue(); } } return enqueued_t; return enqueued_t; Loading @@ -136,6 +142,7 @@ class EnqueueBuffer { mutable std::mutex mutex_; mutable std::mutex mutex_; IQueueEnqueue<T>* queue_; IQueueEnqueue<T>* queue_; std::atomic_bool enqueue_registered_ = false; std::queue<std::unique_ptr<T>> buffer_; std::queue<std::unique_ptr<T>> buffer_; }; }; Loading Loading
system/gd/os/linux_generic/queue_unittest.cc +12 −0 Original line number Original line Diff line number Diff line Loading @@ -892,6 +892,18 @@ TEST_F(EnqueueBufferTest, clear) { ASSERT_FALSE(enqueue_.registered_); ASSERT_FALSE(enqueue_.registered_); } } TEST_F(EnqueueBufferTest, delete_when_in_callback) { Queue<int>* queue = new Queue<int>(kQueueSize); EnqueueBuffer<int>* enqueue_buffer = new EnqueueBuffer<int>(queue); int num_items = 10; for (int i = 0; i < num_items; i++) { enqueue_buffer->Enqueue(std::make_unique<int>(i), handler_); } delete enqueue_buffer; delete queue; } } // namespace } // namespace } // namespace os } // namespace os } // namespace bluetooth } // namespace bluetooth
system/gd/os/linux_generic/reactor.cc +2 −0 Original line number Original line Diff line number Diff line Loading @@ -193,10 +193,12 @@ void Reactor::Unregister(Reactor::Reactable* reactable) { } } bool Reactor::WaitForUnregisteredReactable(std::chrono::milliseconds timeout) { bool Reactor::WaitForUnregisteredReactable(std::chrono::milliseconds timeout) { std::lock_guard<std::mutex> lock(mutex_); if (executing_reactable_finished_ == nullptr) { if (executing_reactable_finished_ == nullptr) { return true; return true; } } auto stop_status = executing_reactable_finished_->wait_for(timeout); auto stop_status = executing_reactable_finished_->wait_for(timeout); executing_reactable_finished_ = nullptr; if (stop_status != std::future_status::ready) { if (stop_status != std::future_status::ready) { LOG_ERROR("Unregister reactable timed out"); LOG_ERROR("Unregister reactable timed out"); } } Loading
system/gd/os/queue.h +10 −3 Original line number Original line Diff line number Diff line Loading @@ -106,17 +106,23 @@ class EnqueueBuffer { public: public: EnqueueBuffer(IQueueEnqueue<T>* queue) : queue_(queue) {} EnqueueBuffer(IQueueEnqueue<T>* queue) : queue_(queue) {} ~EnqueueBuffer() { if (enqueue_registered_.exchange(false)) { queue_->UnregisterEnqueue(); } } void Enqueue(std::unique_ptr<T> t, os::Handler* handler) { void Enqueue(std::unique_ptr<T> t, os::Handler* handler) { std::lock_guard<std::mutex> lock(mutex_); std::lock_guard<std::mutex> lock(mutex_); buffer_.push(std::move(t)); buffer_.push(std::move(t)); if (buffer_.size() == 1) { if (!enqueue_registered_.exchange(true)) { queue_->RegisterEnqueue(handler, common::Bind(&EnqueueBuffer<T>::enqueue_callback, common::Unretained(this))); queue_->RegisterEnqueue(handler, common::Bind(&EnqueueBuffer<T>::enqueue_callback, common::Unretained(this))); } } } } void Clear() { void Clear() { std::lock_guard<std::mutex> lock(mutex_); std::lock_guard<std::mutex> lock(mutex_); if (!buffer_.empty()) { if (enqueue_registered_.exchange(false)) { queue_->UnregisterEnqueue(); queue_->UnregisterEnqueue(); std::queue<std::unique_ptr<T>> empty; std::queue<std::unique_ptr<T>> empty; std::swap(buffer_, empty); std::swap(buffer_, empty); Loading @@ -128,7 +134,7 @@ class EnqueueBuffer { std::lock_guard<std::mutex> lock(mutex_); std::lock_guard<std::mutex> lock(mutex_); std::unique_ptr<T> enqueued_t = std::move(buffer_.front()); std::unique_ptr<T> enqueued_t = std::move(buffer_.front()); buffer_.pop(); buffer_.pop(); if (buffer_.empty()) { if (buffer_.empty() && enqueue_registered_.exchange(false)) { queue_->UnregisterEnqueue(); queue_->UnregisterEnqueue(); } } return enqueued_t; return enqueued_t; Loading @@ -136,6 +142,7 @@ class EnqueueBuffer { mutable std::mutex mutex_; mutable std::mutex mutex_; IQueueEnqueue<T>* queue_; IQueueEnqueue<T>* queue_; std::atomic_bool enqueue_registered_ = false; std::queue<std::unique_ptr<T>> buffer_; std::queue<std::unique_ptr<T>> buffer_; }; }; Loading