Loading system/gd/common/bidi_queue_unittest.cc +1 −0 Original line number Diff line number Diff line Loading @@ -64,6 +64,7 @@ class TestBidiQueueEnd { : handler_(handler), end_(end) {} ~TestBidiQueueEnd() { handler_->Clear(); } std::promise<void>* Send(TA* value) { Loading system/gd/module.cc +4 −2 Original line number Diff line number Diff line Loading @@ -83,12 +83,14 @@ Module* ModuleRegistry::Start(const ModuleFactory* module, Thread* thread) { } void ModuleRegistry::StopAll() { // Since modules were brought up in dependency order, // it is safe to tear down by going in reverse order. // Since modules were brought up in dependency order, it is safe to tear down by going in reverse order. for (auto it = start_order_.rbegin(); it != start_order_.rend(); it++) { auto instance = started_modules_.find(*it); ASSERT(instance != started_modules_.end()); // Clear the handler before stopping the module to allow it to shut down gracefully. instance->second->handler_->Clear(); instance->second->handler_->WaitUntilStopped(kModuleStopTimeout); instance->second->Stop(); delete instance->second->handler_; Loading system/gd/os/handler.h +7 −1 Original line number Diff line number Diff line Loading @@ -46,6 +46,9 @@ class Handler { // Remove all pending events from the queue of this handler void Clear(); // Die if the current reactable doesn't stop before the timeout. Must be called after Clear() void WaitUntilStopped(std::chrono::milliseconds timeout); template <typename T> friend class Queue; Loading @@ -54,7 +57,10 @@ class Handler { friend class RepeatingAlarm; private: std::queue<Closure> tasks_; inline bool was_cleared() const { return tasks_ == nullptr; }; std::queue<Closure>* tasks_; Thread* thread_; int fd_; Reactor::Reactable* reactable_; Loading system/gd/os/linux_generic/alarm_unittest.cc +1 −0 Original line number Diff line number Diff line Loading @@ -34,6 +34,7 @@ class AlarmTest : public ::testing::Test { void TearDown() override { delete alarm_; handler_->Clear(); delete handler_; delete thread_; } Loading system/gd/os/linux_generic/handler.cc +36 −22 Original line number Diff line number Diff line Loading @@ -17,8 +17,8 @@ #include "os/handler.h" #include <sys/eventfd.h> #include <cstring> #include <unistd.h> #include <cstring> #include "os/log.h" #include "os/reactor.h" Loading @@ -31,15 +31,17 @@ namespace bluetooth { namespace os { Handler::Handler(Thread* thread) : thread_(thread), fd_(eventfd(0, EFD_SEMAPHORE | EFD_NONBLOCK)) { Handler::Handler(Thread* thread) : tasks_(new std::queue<Closure>()), thread_(thread), fd_(eventfd(0, EFD_SEMAPHORE | EFD_NONBLOCK)) { ASSERT(fd_ != -1); reactable_ = thread_->GetReactor()->Register(fd_, [this] { this->handle_next_event(); }, nullptr); } Handler::~Handler() { thread_->GetReactor()->Unregister(reactable_); reactable_ = nullptr; { std::lock_guard<std::mutex> lock(mutex_); ASSERT_LOG(was_cleared(), "Handlers must be cleared before they are destroyed"); } int close_status; RUN_NO_INTR(close_status = close(fd_)); Loading @@ -49,7 +51,10 @@ Handler::~Handler() { void Handler::Post(Closure closure) { { std::lock_guard<std::mutex> lock(mutex_); tasks_.emplace(std::move(closure)); if (was_cleared()) { return; } tasks_->emplace(std::move(closure)); } uint64_t val = 1; auto write_result = eventfd_write(fd_, val); Loading @@ -57,32 +62,41 @@ void Handler::Post(Closure closure) { } void Handler::Clear() { std::queue<Closure>* tmp = nullptr; { std::lock_guard<std::mutex> lock(mutex_); std::queue<Closure> empty; std::swap(tasks_, empty); ASSERT_LOG(!was_cleared(), "Handlers must only be cleared once"); std::swap(tasks_, tmp); } delete tmp; uint64_t val; while (eventfd_read(fd_, &val) == 0) { } thread_->GetReactor()->Unregister(reactable_); reactable_ = nullptr; } void Handler::WaitUntilStopped(std::chrono::milliseconds timeout) { ASSERT(reactable_ == nullptr); ASSERT(thread_->GetReactor()->WaitForUnregisteredReactable(timeout)); } void Handler::handle_next_event() { Closure closure; { std::lock_guard<std::mutex> lock(mutex_); uint64_t val = 0; auto read_result = eventfd_read(fd_, &val); if (read_result == -1 && errno == EAGAIN) { // We were told there was an item, but it was removed before we got there // (aka the queue was cleared). Not a fatal error, so just bail. if (was_cleared()) { return; } ASSERT_LOG(read_result != -1, "eventfd read error %d %s", errno, strerror(errno)); ASSERT(read_result != -1); { std::lock_guard<std::mutex> lock(mutex_); closure = std::move(tasks_.front()); tasks_.pop(); closure = std::move(tasks_->front()); tasks_->pop(); } closure(); } Loading Loading
system/gd/common/bidi_queue_unittest.cc +1 −0 Original line number Diff line number Diff line Loading @@ -64,6 +64,7 @@ class TestBidiQueueEnd { : handler_(handler), end_(end) {} ~TestBidiQueueEnd() { handler_->Clear(); } std::promise<void>* Send(TA* value) { Loading
system/gd/module.cc +4 −2 Original line number Diff line number Diff line Loading @@ -83,12 +83,14 @@ Module* ModuleRegistry::Start(const ModuleFactory* module, Thread* thread) { } void ModuleRegistry::StopAll() { // Since modules were brought up in dependency order, // it is safe to tear down by going in reverse order. // Since modules were brought up in dependency order, it is safe to tear down by going in reverse order. for (auto it = start_order_.rbegin(); it != start_order_.rend(); it++) { auto instance = started_modules_.find(*it); ASSERT(instance != started_modules_.end()); // Clear the handler before stopping the module to allow it to shut down gracefully. instance->second->handler_->Clear(); instance->second->handler_->WaitUntilStopped(kModuleStopTimeout); instance->second->Stop(); delete instance->second->handler_; Loading
system/gd/os/handler.h +7 −1 Original line number Diff line number Diff line Loading @@ -46,6 +46,9 @@ class Handler { // Remove all pending events from the queue of this handler void Clear(); // Die if the current reactable doesn't stop before the timeout. Must be called after Clear() void WaitUntilStopped(std::chrono::milliseconds timeout); template <typename T> friend class Queue; Loading @@ -54,7 +57,10 @@ class Handler { friend class RepeatingAlarm; private: std::queue<Closure> tasks_; inline bool was_cleared() const { return tasks_ == nullptr; }; std::queue<Closure>* tasks_; Thread* thread_; int fd_; Reactor::Reactable* reactable_; Loading
system/gd/os/linux_generic/alarm_unittest.cc +1 −0 Original line number Diff line number Diff line Loading @@ -34,6 +34,7 @@ class AlarmTest : public ::testing::Test { void TearDown() override { delete alarm_; handler_->Clear(); delete handler_; delete thread_; } Loading
system/gd/os/linux_generic/handler.cc +36 −22 Original line number Diff line number Diff line Loading @@ -17,8 +17,8 @@ #include "os/handler.h" #include <sys/eventfd.h> #include <cstring> #include <unistd.h> #include <cstring> #include "os/log.h" #include "os/reactor.h" Loading @@ -31,15 +31,17 @@ namespace bluetooth { namespace os { Handler::Handler(Thread* thread) : thread_(thread), fd_(eventfd(0, EFD_SEMAPHORE | EFD_NONBLOCK)) { Handler::Handler(Thread* thread) : tasks_(new std::queue<Closure>()), thread_(thread), fd_(eventfd(0, EFD_SEMAPHORE | EFD_NONBLOCK)) { ASSERT(fd_ != -1); reactable_ = thread_->GetReactor()->Register(fd_, [this] { this->handle_next_event(); }, nullptr); } Handler::~Handler() { thread_->GetReactor()->Unregister(reactable_); reactable_ = nullptr; { std::lock_guard<std::mutex> lock(mutex_); ASSERT_LOG(was_cleared(), "Handlers must be cleared before they are destroyed"); } int close_status; RUN_NO_INTR(close_status = close(fd_)); Loading @@ -49,7 +51,10 @@ Handler::~Handler() { void Handler::Post(Closure closure) { { std::lock_guard<std::mutex> lock(mutex_); tasks_.emplace(std::move(closure)); if (was_cleared()) { return; } tasks_->emplace(std::move(closure)); } uint64_t val = 1; auto write_result = eventfd_write(fd_, val); Loading @@ -57,32 +62,41 @@ void Handler::Post(Closure closure) { } void Handler::Clear() { std::queue<Closure>* tmp = nullptr; { std::lock_guard<std::mutex> lock(mutex_); std::queue<Closure> empty; std::swap(tasks_, empty); ASSERT_LOG(!was_cleared(), "Handlers must only be cleared once"); std::swap(tasks_, tmp); } delete tmp; uint64_t val; while (eventfd_read(fd_, &val) == 0) { } thread_->GetReactor()->Unregister(reactable_); reactable_ = nullptr; } void Handler::WaitUntilStopped(std::chrono::milliseconds timeout) { ASSERT(reactable_ == nullptr); ASSERT(thread_->GetReactor()->WaitForUnregisteredReactable(timeout)); } void Handler::handle_next_event() { Closure closure; { std::lock_guard<std::mutex> lock(mutex_); uint64_t val = 0; auto read_result = eventfd_read(fd_, &val); if (read_result == -1 && errno == EAGAIN) { // We were told there was an item, but it was removed before we got there // (aka the queue was cleared). Not a fatal error, so just bail. if (was_cleared()) { return; } ASSERT_LOG(read_result != -1, "eventfd read error %d %s", errno, strerror(errno)); ASSERT(read_result != -1); { std::lock_guard<std::mutex> lock(mutex_); closure = std::move(tasks_.front()); tasks_.pop(); closure = std::move(tasks_->front()); tasks_->pop(); } closure(); } Loading