Loading system/gd/os/alarm.h +4 −1 Original line number Diff line number Diff line Loading @@ -34,6 +34,9 @@ class Alarm { // Create and register a single-shot alarm on given thread explicit Alarm(Thread* thread); // Create and register a single-shot alarm with a given reactor explicit Alarm(Reactor* reactor); // Unregister this alarm from the thread and release resource ~Alarm(); Loading @@ -47,7 +50,7 @@ class Alarm { private: Closure task_; Thread* thread_; Reactor* reactor_; int fd_ = 0; Reactor::Reactable* token_; mutable std::mutex mutex_; Loading system/gd/os/handler.h +4 −1 Original line number Diff line number Diff line Loading @@ -35,6 +35,9 @@ class Handler { // Create and register a handler on given thread explicit Handler(Thread* thread); // Create and register a handler with a given reactor explicit Handler(Reactor* reactor); // Unregister this handler from the thread and release resource. Unhandled events will be discarded and not executed. ~Handler(); Loading @@ -51,7 +54,7 @@ class Handler { private: std::queue<Closure> tasks_; Thread* thread_; Reactor* reactor_; int fd_; Reactor::Reactable* reactable_; mutable std::mutex mutex_; Loading system/gd/os/linux_generic/alarm.cc +5 −5 Original line number Diff line number Diff line Loading @@ -32,16 +32,16 @@ namespace bluetooth { namespace os { Alarm::Alarm(Thread* thread) : thread_(thread), fd_(timerfd_create(ALARM_CLOCK, 0)) { Alarm::Alarm(Reactor* reactor) : reactor_(reactor), fd_(timerfd_create(ALARM_CLOCK, 0)) { ASSERT_LOG(fd_ != -1, "cannot create timerfd: %s", strerror(errno)); token_ = thread_->GetReactor()->Register(fd_, [this] { on_fire(); }, nullptr); token_ = reactor_->Register(fd_, [this] { on_fire(); }, nullptr); } Alarm::Alarm(Thread* thread) : Alarm(thread->GetReactor()) {} Alarm::~Alarm() { thread_->GetReactor()->Unregister(token_); reactor_->Unregister(token_); int close_status; RUN_NO_INTR(close_status = close(fd_)); Loading system/gd/os/linux_generic/handler.cc +5 −5 Original line number Diff line number Diff line Loading @@ -31,16 +31,16 @@ namespace bluetooth { namespace os { Handler::Handler(Thread* thread) : thread_(thread), fd_(eventfd(0, EFD_SEMAPHORE | EFD_NONBLOCK)) { Handler::Handler(Reactor* reactor) : reactor_(reactor), fd_(eventfd(0, EFD_SEMAPHORE | EFD_NONBLOCK)) { ASSERT(fd_ != -1); reactable_ = thread_->GetReactor()->Register(fd_, [this] { this->handle_next_event(); }, nullptr); reactable_ = reactor_->Register(fd_, [this] { this->handle_next_event(); }, nullptr); } Handler::Handler(Thread* thread) : Handler(thread->GetReactor()) {} Handler::~Handler() { thread_->GetReactor()->Unregister(reactable_); reactor_->Unregister(reactable_); reactable_ = nullptr; int close_status; Loading system/gd/os/linux_generic/queue.tpp +11 −17 Original line number Diff line number Diff line Loading @@ -16,33 +16,30 @@ template <typename T> Queue<T>::Queue(size_t capacity) : enqueue_callback_(nullptr), dequeue_callback_(nullptr), enqueue_(capacity), dequeue_(0){}; : enqueue_(capacity), dequeue_(0){}; template <typename T> Queue<T>::~Queue() { ASSERT(enqueue_callback_ == nullptr); ASSERT(dequeue_callback_ == nullptr); ASSERT(enqueue_.handler_ == nullptr); ASSERT(dequeue_.handler_ == nullptr); }; template <typename T> void Queue<T>::RegisterEnqueue(Handler* handler, EnqueueCallback callback) { std::lock_guard<std::mutex> lock(mutex_); ASSERT(enqueue_.handler_ == nullptr); ASSERT(enqueue_callback_ == nullptr); ASSERT(enqueue_.reactable_ == nullptr); enqueue_.handler_ = handler; enqueue_callback_ = callback; enqueue_.reactable_ = enqueue_.handler_->thread_->GetReactor()->Register( enqueue_.reactive_semaphore_.GetFd(), [this] { EnqueueCallbackInternal(); }, nullptr); enqueue_.reactable_ = enqueue_.handler_->reactor_->Register( enqueue_.reactive_semaphore_.GetFd(), [this, callback] { EnqueueCallbackInternal(callback); }, nullptr); } template <typename T> void Queue<T>::UnregisterEnqueue() { std::lock_guard<std::mutex> lock(mutex_); ASSERT(enqueue_.reactable_ != nullptr); enqueue_.handler_->thread_->GetReactor()->Unregister(enqueue_.reactable_); enqueue_.handler_->reactor_->Unregister(enqueue_.reactable_); enqueue_.reactable_ = nullptr; enqueue_callback_ = nullptr; enqueue_.handler_ = nullptr; } Loading @@ -50,21 +47,18 @@ template <typename T> void Queue<T>::RegisterDequeue(Handler* handler, DequeueCallback callback) { std::lock_guard<std::mutex> lock(mutex_); ASSERT(dequeue_.handler_ == nullptr); ASSERT(dequeue_callback_ == nullptr); ASSERT(dequeue_.reactable_ == nullptr); dequeue_.handler_ = handler; dequeue_callback_ = callback; dequeue_.reactable_ = dequeue_.handler_->thread_->GetReactor()->Register(dequeue_.reactive_semaphore_.GetFd(), [this] { dequeue_callback_(); }, nullptr); dequeue_.reactable_ = dequeue_.handler_->reactor_->Register(dequeue_.reactive_semaphore_.GetFd(), [callback] { callback(); }, nullptr); } template <typename T> void Queue<T>::UnregisterDequeue() { std::lock_guard<std::mutex> lock(mutex_); ASSERT(dequeue_.reactable_ != nullptr); dequeue_.handler_->thread_->GetReactor()->Unregister(dequeue_.reactable_); dequeue_.handler_->reactor_->Unregister(dequeue_.reactable_); dequeue_.reactable_ = nullptr; dequeue_callback_ = nullptr; dequeue_.handler_ = nullptr; } Loading @@ -87,11 +81,11 @@ std::unique_ptr<T> Queue<T>::TryDequeue() { } template <typename T> void Queue<T>::EnqueueCallbackInternal() { void Queue<T>::EnqueueCallbackInternal(EnqueueCallback callback) { enqueue_.reactive_semaphore_.Decrease(); { std::unique_ptr<T> data = enqueue_callback_(); std::unique_ptr<T> data = callback(); ASSERT(data != nullptr); std::lock_guard<std::mutex> lock(mutex_); queue_.push(std::move(data)); Loading Loading
system/gd/os/alarm.h +4 −1 Original line number Diff line number Diff line Loading @@ -34,6 +34,9 @@ class Alarm { // Create and register a single-shot alarm on given thread explicit Alarm(Thread* thread); // Create and register a single-shot alarm with a given reactor explicit Alarm(Reactor* reactor); // Unregister this alarm from the thread and release resource ~Alarm(); Loading @@ -47,7 +50,7 @@ class Alarm { private: Closure task_; Thread* thread_; Reactor* reactor_; int fd_ = 0; Reactor::Reactable* token_; mutable std::mutex mutex_; Loading
system/gd/os/handler.h +4 −1 Original line number Diff line number Diff line Loading @@ -35,6 +35,9 @@ class Handler { // Create and register a handler on given thread explicit Handler(Thread* thread); // Create and register a handler with a given reactor explicit Handler(Reactor* reactor); // Unregister this handler from the thread and release resource. Unhandled events will be discarded and not executed. ~Handler(); Loading @@ -51,7 +54,7 @@ class Handler { private: std::queue<Closure> tasks_; Thread* thread_; Reactor* reactor_; int fd_; Reactor::Reactable* reactable_; mutable std::mutex mutex_; Loading
system/gd/os/linux_generic/alarm.cc +5 −5 Original line number Diff line number Diff line Loading @@ -32,16 +32,16 @@ namespace bluetooth { namespace os { Alarm::Alarm(Thread* thread) : thread_(thread), fd_(timerfd_create(ALARM_CLOCK, 0)) { Alarm::Alarm(Reactor* reactor) : reactor_(reactor), fd_(timerfd_create(ALARM_CLOCK, 0)) { ASSERT_LOG(fd_ != -1, "cannot create timerfd: %s", strerror(errno)); token_ = thread_->GetReactor()->Register(fd_, [this] { on_fire(); }, nullptr); token_ = reactor_->Register(fd_, [this] { on_fire(); }, nullptr); } Alarm::Alarm(Thread* thread) : Alarm(thread->GetReactor()) {} Alarm::~Alarm() { thread_->GetReactor()->Unregister(token_); reactor_->Unregister(token_); int close_status; RUN_NO_INTR(close_status = close(fd_)); Loading
system/gd/os/linux_generic/handler.cc +5 −5 Original line number Diff line number Diff line Loading @@ -31,16 +31,16 @@ namespace bluetooth { namespace os { Handler::Handler(Thread* thread) : thread_(thread), fd_(eventfd(0, EFD_SEMAPHORE | EFD_NONBLOCK)) { Handler::Handler(Reactor* reactor) : reactor_(reactor), fd_(eventfd(0, EFD_SEMAPHORE | EFD_NONBLOCK)) { ASSERT(fd_ != -1); reactable_ = thread_->GetReactor()->Register(fd_, [this] { this->handle_next_event(); }, nullptr); reactable_ = reactor_->Register(fd_, [this] { this->handle_next_event(); }, nullptr); } Handler::Handler(Thread* thread) : Handler(thread->GetReactor()) {} Handler::~Handler() { thread_->GetReactor()->Unregister(reactable_); reactor_->Unregister(reactable_); reactable_ = nullptr; int close_status; Loading
system/gd/os/linux_generic/queue.tpp +11 −17 Original line number Diff line number Diff line Loading @@ -16,33 +16,30 @@ template <typename T> Queue<T>::Queue(size_t capacity) : enqueue_callback_(nullptr), dequeue_callback_(nullptr), enqueue_(capacity), dequeue_(0){}; : enqueue_(capacity), dequeue_(0){}; template <typename T> Queue<T>::~Queue() { ASSERT(enqueue_callback_ == nullptr); ASSERT(dequeue_callback_ == nullptr); ASSERT(enqueue_.handler_ == nullptr); ASSERT(dequeue_.handler_ == nullptr); }; template <typename T> void Queue<T>::RegisterEnqueue(Handler* handler, EnqueueCallback callback) { std::lock_guard<std::mutex> lock(mutex_); ASSERT(enqueue_.handler_ == nullptr); ASSERT(enqueue_callback_ == nullptr); ASSERT(enqueue_.reactable_ == nullptr); enqueue_.handler_ = handler; enqueue_callback_ = callback; enqueue_.reactable_ = enqueue_.handler_->thread_->GetReactor()->Register( enqueue_.reactive_semaphore_.GetFd(), [this] { EnqueueCallbackInternal(); }, nullptr); enqueue_.reactable_ = enqueue_.handler_->reactor_->Register( enqueue_.reactive_semaphore_.GetFd(), [this, callback] { EnqueueCallbackInternal(callback); }, nullptr); } template <typename T> void Queue<T>::UnregisterEnqueue() { std::lock_guard<std::mutex> lock(mutex_); ASSERT(enqueue_.reactable_ != nullptr); enqueue_.handler_->thread_->GetReactor()->Unregister(enqueue_.reactable_); enqueue_.handler_->reactor_->Unregister(enqueue_.reactable_); enqueue_.reactable_ = nullptr; enqueue_callback_ = nullptr; enqueue_.handler_ = nullptr; } Loading @@ -50,21 +47,18 @@ template <typename T> void Queue<T>::RegisterDequeue(Handler* handler, DequeueCallback callback) { std::lock_guard<std::mutex> lock(mutex_); ASSERT(dequeue_.handler_ == nullptr); ASSERT(dequeue_callback_ == nullptr); ASSERT(dequeue_.reactable_ == nullptr); dequeue_.handler_ = handler; dequeue_callback_ = callback; dequeue_.reactable_ = dequeue_.handler_->thread_->GetReactor()->Register(dequeue_.reactive_semaphore_.GetFd(), [this] { dequeue_callback_(); }, nullptr); dequeue_.reactable_ = dequeue_.handler_->reactor_->Register(dequeue_.reactive_semaphore_.GetFd(), [callback] { callback(); }, nullptr); } template <typename T> void Queue<T>::UnregisterDequeue() { std::lock_guard<std::mutex> lock(mutex_); ASSERT(dequeue_.reactable_ != nullptr); dequeue_.handler_->thread_->GetReactor()->Unregister(dequeue_.reactable_); dequeue_.handler_->reactor_->Unregister(dequeue_.reactable_); dequeue_.reactable_ = nullptr; dequeue_callback_ = nullptr; dequeue_.handler_ = nullptr; } Loading @@ -87,11 +81,11 @@ std::unique_ptr<T> Queue<T>::TryDequeue() { } template <typename T> void Queue<T>::EnqueueCallbackInternal() { void Queue<T>::EnqueueCallbackInternal(EnqueueCallback callback) { enqueue_.reactive_semaphore_.Decrease(); { std::unique_ptr<T> data = enqueue_callback_(); std::unique_ptr<T> data = callback(); ASSERT(data != nullptr); std::lock_guard<std::mutex> lock(mutex_); queue_.push(std::move(data)); Loading