Loading system/gd/os/linux_generic/queue.tppdeleted 100644 → 0 +0 −114 Original line number Diff line number Diff line /* * Copyright 2019 The Android Open Source Project * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ template <typename T> Queue<T>::Queue(size_t capacity) : enqueue_(capacity), dequeue_(0){}; template <typename T> Queue<T>::~Queue() { ASSERT_LOG(enqueue_.handler_ == nullptr, "Enqueue is not unregistered"); ASSERT_LOG(dequeue_.handler_ == nullptr, "Dequeue is not unregistered"); }; template <typename T> void Queue<T>::RegisterEnqueue(Handler* handler, EnqueueCallback callback) { std::lock_guard<std::mutex> lock(mutex_); ASSERT(enqueue_.handler_ == nullptr); ASSERT(enqueue_.reactable_ == nullptr); enqueue_.handler_ = handler; enqueue_.reactable_ = enqueue_.handler_->thread_->GetReactor()->Register( enqueue_.reactive_semaphore_.GetFd(), base::Bind(&Queue<T>::EnqueueCallbackInternal, base::Unretained(this), std::move(callback)), base::Closure()); } template <typename T> void Queue<T>::UnregisterEnqueue() { Reactor* reactor = nullptr; Reactor::Reactable* to_unregister = nullptr; bool wait_for_unregister = false; { std::lock_guard<std::mutex> lock(mutex_); ASSERT(enqueue_.reactable_ != nullptr); reactor = enqueue_.handler_->thread_->GetReactor(); wait_for_unregister = (!enqueue_.handler_->thread_->IsSameThread()); to_unregister = enqueue_.reactable_; enqueue_.reactable_ = nullptr; enqueue_.handler_ = nullptr; } reactor->Unregister(to_unregister); if (wait_for_unregister) { reactor->WaitForUnregisteredReactable(std::chrono::milliseconds(1000)); } } template <typename T> void Queue<T>::RegisterDequeue(Handler* handler, DequeueCallback callback) { std::lock_guard<std::mutex> lock(mutex_); ASSERT(dequeue_.handler_ == nullptr); ASSERT(dequeue_.reactable_ == nullptr); dequeue_.handler_ = handler; dequeue_.reactable_ = dequeue_.handler_->thread_->GetReactor()->Register( dequeue_.reactive_semaphore_.GetFd(), callback, base::Closure()); } template <typename T> void Queue<T>::UnregisterDequeue() { Reactor* reactor = nullptr; Reactor::Reactable* to_unregister = nullptr; bool wait_for_unregister = false; { std::lock_guard<std::mutex> lock(mutex_); ASSERT(dequeue_.reactable_ != nullptr); reactor = dequeue_.handler_->thread_->GetReactor(); wait_for_unregister = (!dequeue_.handler_->thread_->IsSameThread()); to_unregister = dequeue_.reactable_; dequeue_.reactable_ = nullptr; dequeue_.handler_ = nullptr; } reactor->Unregister(to_unregister); if (wait_for_unregister) { reactor->WaitForUnregisteredReactable(std::chrono::milliseconds(1000)); } } template <typename T> std::unique_ptr<T> Queue<T>::TryDequeue() { std::lock_guard<std::mutex> lock(mutex_); if (queue_.empty()) { return nullptr; } dequeue_.reactive_semaphore_.Decrease(); std::unique_ptr<T> data = std::move(queue_.front()); queue_.pop(); enqueue_.reactive_semaphore_.Increase(); return data; } template <typename T> void Queue<T>::EnqueueCallbackInternal(EnqueueCallback callback) { std::unique_ptr<T> data = callback.Run(); ASSERT(data != nullptr); std::lock_guard<std::mutex> lock(mutex_); enqueue_.reactive_semaphore_.Decrease(); queue_.push(std::move(data)); dequeue_.reactive_semaphore_.Increase(); } system/gd/os/queue.h +98 −1 Original line number Diff line number Diff line Loading @@ -157,7 +157,104 @@ class EnqueueBuffer { common::OnceClosure callback_on_empty_; }; #include "os/linux_generic/queue.tpp" template <typename T> Queue<T>::Queue(size_t capacity) : enqueue_(capacity), dequeue_(0){}; template <typename T> Queue<T>::~Queue() { ASSERT_LOG(enqueue_.handler_ == nullptr, "Enqueue is not unregistered"); ASSERT_LOG(dequeue_.handler_ == nullptr, "Dequeue is not unregistered"); }; template <typename T> void Queue<T>::RegisterEnqueue(Handler* handler, EnqueueCallback callback) { std::lock_guard<std::mutex> lock(mutex_); ASSERT(enqueue_.handler_ == nullptr); ASSERT(enqueue_.reactable_ == nullptr); enqueue_.handler_ = handler; enqueue_.reactable_ = enqueue_.handler_->thread_->GetReactor()->Register( enqueue_.reactive_semaphore_.GetFd(), base::Bind(&Queue<T>::EnqueueCallbackInternal, base::Unretained(this), std::move(callback)), base::Closure()); } template <typename T> void Queue<T>::UnregisterEnqueue() { Reactor* reactor = nullptr; Reactor::Reactable* to_unregister = nullptr; bool wait_for_unregister = false; { std::lock_guard<std::mutex> lock(mutex_); ASSERT(enqueue_.reactable_ != nullptr); reactor = enqueue_.handler_->thread_->GetReactor(); wait_for_unregister = (!enqueue_.handler_->thread_->IsSameThread()); to_unregister = enqueue_.reactable_; enqueue_.reactable_ = nullptr; enqueue_.handler_ = nullptr; } reactor->Unregister(to_unregister); if (wait_for_unregister) { reactor->WaitForUnregisteredReactable(std::chrono::milliseconds(1000)); } } template <typename T> void Queue<T>::RegisterDequeue(Handler* handler, DequeueCallback callback) { std::lock_guard<std::mutex> lock(mutex_); ASSERT(dequeue_.handler_ == nullptr); ASSERT(dequeue_.reactable_ == nullptr); dequeue_.handler_ = handler; dequeue_.reactable_ = dequeue_.handler_->thread_->GetReactor()->Register( dequeue_.reactive_semaphore_.GetFd(), callback, base::Closure()); } template <typename T> void Queue<T>::UnregisterDequeue() { Reactor* reactor = nullptr; Reactor::Reactable* to_unregister = nullptr; bool wait_for_unregister = false; { std::lock_guard<std::mutex> lock(mutex_); ASSERT(dequeue_.reactable_ != nullptr); reactor = dequeue_.handler_->thread_->GetReactor(); wait_for_unregister = (!dequeue_.handler_->thread_->IsSameThread()); to_unregister = dequeue_.reactable_; dequeue_.reactable_ = nullptr; dequeue_.handler_ = nullptr; } reactor->Unregister(to_unregister); if (wait_for_unregister) { reactor->WaitForUnregisteredReactable(std::chrono::milliseconds(1000)); } } template <typename T> std::unique_ptr<T> Queue<T>::TryDequeue() { std::lock_guard<std::mutex> lock(mutex_); if (queue_.empty()) { return nullptr; } dequeue_.reactive_semaphore_.Decrease(); std::unique_ptr<T> data = std::move(queue_.front()); queue_.pop(); enqueue_.reactive_semaphore_.Increase(); return data; } template <typename T> void Queue<T>::EnqueueCallbackInternal(EnqueueCallback callback) { std::unique_ptr<T> data = callback.Run(); ASSERT(data != nullptr); std::lock_guard<std::mutex> lock(mutex_); enqueue_.reactive_semaphore_.Decrease(); queue_.push(std::move(data)); dequeue_.reactive_semaphore_.Increase(); } } // namespace os } // namespace bluetooth Loading
system/gd/os/linux_generic/queue.tppdeleted 100644 → 0 +0 −114 Original line number Diff line number Diff line /* * Copyright 2019 The Android Open Source Project * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ template <typename T> Queue<T>::Queue(size_t capacity) : enqueue_(capacity), dequeue_(0){}; template <typename T> Queue<T>::~Queue() { ASSERT_LOG(enqueue_.handler_ == nullptr, "Enqueue is not unregistered"); ASSERT_LOG(dequeue_.handler_ == nullptr, "Dequeue is not unregistered"); }; template <typename T> void Queue<T>::RegisterEnqueue(Handler* handler, EnqueueCallback callback) { std::lock_guard<std::mutex> lock(mutex_); ASSERT(enqueue_.handler_ == nullptr); ASSERT(enqueue_.reactable_ == nullptr); enqueue_.handler_ = handler; enqueue_.reactable_ = enqueue_.handler_->thread_->GetReactor()->Register( enqueue_.reactive_semaphore_.GetFd(), base::Bind(&Queue<T>::EnqueueCallbackInternal, base::Unretained(this), std::move(callback)), base::Closure()); } template <typename T> void Queue<T>::UnregisterEnqueue() { Reactor* reactor = nullptr; Reactor::Reactable* to_unregister = nullptr; bool wait_for_unregister = false; { std::lock_guard<std::mutex> lock(mutex_); ASSERT(enqueue_.reactable_ != nullptr); reactor = enqueue_.handler_->thread_->GetReactor(); wait_for_unregister = (!enqueue_.handler_->thread_->IsSameThread()); to_unregister = enqueue_.reactable_; enqueue_.reactable_ = nullptr; enqueue_.handler_ = nullptr; } reactor->Unregister(to_unregister); if (wait_for_unregister) { reactor->WaitForUnregisteredReactable(std::chrono::milliseconds(1000)); } } template <typename T> void Queue<T>::RegisterDequeue(Handler* handler, DequeueCallback callback) { std::lock_guard<std::mutex> lock(mutex_); ASSERT(dequeue_.handler_ == nullptr); ASSERT(dequeue_.reactable_ == nullptr); dequeue_.handler_ = handler; dequeue_.reactable_ = dequeue_.handler_->thread_->GetReactor()->Register( dequeue_.reactive_semaphore_.GetFd(), callback, base::Closure()); } template <typename T> void Queue<T>::UnregisterDequeue() { Reactor* reactor = nullptr; Reactor::Reactable* to_unregister = nullptr; bool wait_for_unregister = false; { std::lock_guard<std::mutex> lock(mutex_); ASSERT(dequeue_.reactable_ != nullptr); reactor = dequeue_.handler_->thread_->GetReactor(); wait_for_unregister = (!dequeue_.handler_->thread_->IsSameThread()); to_unregister = dequeue_.reactable_; dequeue_.reactable_ = nullptr; dequeue_.handler_ = nullptr; } reactor->Unregister(to_unregister); if (wait_for_unregister) { reactor->WaitForUnregisteredReactable(std::chrono::milliseconds(1000)); } } template <typename T> std::unique_ptr<T> Queue<T>::TryDequeue() { std::lock_guard<std::mutex> lock(mutex_); if (queue_.empty()) { return nullptr; } dequeue_.reactive_semaphore_.Decrease(); std::unique_ptr<T> data = std::move(queue_.front()); queue_.pop(); enqueue_.reactive_semaphore_.Increase(); return data; } template <typename T> void Queue<T>::EnqueueCallbackInternal(EnqueueCallback callback) { std::unique_ptr<T> data = callback.Run(); ASSERT(data != nullptr); std::lock_guard<std::mutex> lock(mutex_); enqueue_.reactive_semaphore_.Decrease(); queue_.push(std::move(data)); dequeue_.reactive_semaphore_.Increase(); }
system/gd/os/queue.h +98 −1 Original line number Diff line number Diff line Loading @@ -157,7 +157,104 @@ class EnqueueBuffer { common::OnceClosure callback_on_empty_; }; #include "os/linux_generic/queue.tpp" template <typename T> Queue<T>::Queue(size_t capacity) : enqueue_(capacity), dequeue_(0){}; template <typename T> Queue<T>::~Queue() { ASSERT_LOG(enqueue_.handler_ == nullptr, "Enqueue is not unregistered"); ASSERT_LOG(dequeue_.handler_ == nullptr, "Dequeue is not unregistered"); }; template <typename T> void Queue<T>::RegisterEnqueue(Handler* handler, EnqueueCallback callback) { std::lock_guard<std::mutex> lock(mutex_); ASSERT(enqueue_.handler_ == nullptr); ASSERT(enqueue_.reactable_ == nullptr); enqueue_.handler_ = handler; enqueue_.reactable_ = enqueue_.handler_->thread_->GetReactor()->Register( enqueue_.reactive_semaphore_.GetFd(), base::Bind(&Queue<T>::EnqueueCallbackInternal, base::Unretained(this), std::move(callback)), base::Closure()); } template <typename T> void Queue<T>::UnregisterEnqueue() { Reactor* reactor = nullptr; Reactor::Reactable* to_unregister = nullptr; bool wait_for_unregister = false; { std::lock_guard<std::mutex> lock(mutex_); ASSERT(enqueue_.reactable_ != nullptr); reactor = enqueue_.handler_->thread_->GetReactor(); wait_for_unregister = (!enqueue_.handler_->thread_->IsSameThread()); to_unregister = enqueue_.reactable_; enqueue_.reactable_ = nullptr; enqueue_.handler_ = nullptr; } reactor->Unregister(to_unregister); if (wait_for_unregister) { reactor->WaitForUnregisteredReactable(std::chrono::milliseconds(1000)); } } template <typename T> void Queue<T>::RegisterDequeue(Handler* handler, DequeueCallback callback) { std::lock_guard<std::mutex> lock(mutex_); ASSERT(dequeue_.handler_ == nullptr); ASSERT(dequeue_.reactable_ == nullptr); dequeue_.handler_ = handler; dequeue_.reactable_ = dequeue_.handler_->thread_->GetReactor()->Register( dequeue_.reactive_semaphore_.GetFd(), callback, base::Closure()); } template <typename T> void Queue<T>::UnregisterDequeue() { Reactor* reactor = nullptr; Reactor::Reactable* to_unregister = nullptr; bool wait_for_unregister = false; { std::lock_guard<std::mutex> lock(mutex_); ASSERT(dequeue_.reactable_ != nullptr); reactor = dequeue_.handler_->thread_->GetReactor(); wait_for_unregister = (!dequeue_.handler_->thread_->IsSameThread()); to_unregister = dequeue_.reactable_; dequeue_.reactable_ = nullptr; dequeue_.handler_ = nullptr; } reactor->Unregister(to_unregister); if (wait_for_unregister) { reactor->WaitForUnregisteredReactable(std::chrono::milliseconds(1000)); } } template <typename T> std::unique_ptr<T> Queue<T>::TryDequeue() { std::lock_guard<std::mutex> lock(mutex_); if (queue_.empty()) { return nullptr; } dequeue_.reactive_semaphore_.Decrease(); std::unique_ptr<T> data = std::move(queue_.front()); queue_.pop(); enqueue_.reactive_semaphore_.Increase(); return data; } template <typename T> void Queue<T>::EnqueueCallbackInternal(EnqueueCallback callback) { std::unique_ptr<T> data = callback.Run(); ASSERT(data != nullptr); std::lock_guard<std::mutex> lock(mutex_); enqueue_.reactive_semaphore_.Decrease(); queue_.push(std::move(data)); dequeue_.reactive_semaphore_.Increase(); } } // namespace os } // namespace bluetooth