Loading system/gd/os/linux_generic/reactor.cc +19 −5 Original line number Diff line number Diff line Loading @@ -50,6 +50,7 @@ class Reactor::Reactable { bool is_executing_; bool removed_; std::mutex mutex_; std::unique_ptr<std::promise<void>> finished_promise_; }; Reactor::Reactor() Loading Loading @@ -81,8 +82,8 @@ Reactor::~Reactor() { } void Reactor::Run() { bool previously_running = is_running_.exchange(true); ASSERT(!previously_running); bool already_running = is_running_.exchange(true); ASSERT(!already_running); for (;;) { { Loading Loading @@ -113,7 +114,7 @@ void Reactor::Run() { } { std::unique_lock<std::mutex> reactable_lock(reactable->mutex_); std::lock_guard<std::mutex> reactable_lock(reactable->mutex_); lock.unlock(); reactable->is_executing_ = true; } Loading @@ -123,9 +124,12 @@ void Reactor::Run() { if (event.events & EPOLLOUT && reactable->on_write_ready_ != nullptr) { reactable->on_write_ready_(); } { std::lock_guard<std::mutex> reactable_lock(reactable->mutex_); reactable->is_executing_ = false; } if (reactable->removed_) { reactable->finished_promise_->set_value(); delete reactable; } } Loading Loading @@ -180,6 +184,8 @@ void Reactor::Unregister(Reactor::Reactable* reactable) { // executed. reactable->is_executing_ is protected by reactable->mutex_, so it's thread safe. if (reactable->is_executing_) { reactable->removed_ = true; reactable->finished_promise_ = std::make_unique<std::promise<void>>(); executing_reactable_finished_ = std::make_unique<std::future<void>>(reactable->finished_promise_->get_future()); delaying_delete_until_callback_finished = true; } } Loading @@ -189,6 +195,14 @@ void Reactor::Unregister(Reactor::Reactable* reactable) { } } bool Reactor::WaitForUnregisteredReactable(std::chrono::milliseconds timeout) { if (executing_reactable_finished_ == nullptr) { return true; } auto stop_status = executing_reactable_finished_->wait_for(timeout); return stop_status == std::future_status::ready; } void Reactor::ModifyRegistration(Reactor::Reactable* reactable, Closure on_read_ready, Closure on_write_ready) { ASSERT(reactable != nullptr); Loading system/gd/os/linux_generic/reactor_unittest.cc +34 −0 Original line number Diff line number Diff line Loading @@ -229,6 +229,40 @@ TEST_F(ReactorTest, unregister_from_different_thread_while_task_is_executing_) { reactor_thread.join(); } TEST_F(ReactorTest, unregister_from_different_thread_while_task_is_executing_wait_fails) { FakeRunningReactable fake_reactable; auto* reactable = reactor_->Register(fake_reactable.fd_, std::bind(&FakeRunningReactable::OnReadReady, &fake_reactable), nullptr); auto reactor_thread = std::thread(&Reactor::Run, reactor_); auto write_result = eventfd_write(fake_reactable.fd_, 1); ASSERT_EQ(write_result, 0); fake_reactable.started.get_future().wait(); reactor_->Unregister(reactable); ASSERT_FALSE(reactor_->WaitForUnregisteredReactable(std::chrono::milliseconds(1))); fake_reactable.can_finish.set_value(); fake_reactable.finished.get_future().wait(); reactor_->Stop(); reactor_thread.join(); } TEST_F(ReactorTest, unregister_from_different_thread_while_task_is_executing_wait_succeeds) { FakeRunningReactable fake_reactable; auto* reactable = reactor_->Register(fake_reactable.fd_, std::bind(&FakeRunningReactable::OnReadReady, &fake_reactable), nullptr); auto reactor_thread = std::thread(&Reactor::Run, reactor_); auto write_result = eventfd_write(fake_reactable.fd_, 1); ASSERT_EQ(write_result, 0); fake_reactable.started.get_future().wait(); reactor_->Unregister(reactable); fake_reactable.can_finish.set_value(); fake_reactable.finished.get_future().wait(); ASSERT_TRUE(reactor_->WaitForUnregisteredReactable(std::chrono::milliseconds(1))); reactor_->Stop(); reactor_thread.join(); } TEST_F(ReactorTest, hot_unregister_from_different_thread) { FakeReactable fake_reactable; auto* reactable = Loading system/gd/os/reactor.h +5 −0 Original line number Diff line number Diff line Loading @@ -19,6 +19,7 @@ #include <sys/epoll.h> #include <atomic> #include <functional> #include <future> #include <list> #include <mutex> #include <thread> Loading Loading @@ -62,6 +63,9 @@ class Reactor { // Unregister a reactable from this reactor void Unregister(Reactable* reactable); // Wait for up to timeout milliseconds, and return true if the reactable finished executing. bool WaitForUnregisteredReactable(std::chrono::milliseconds timeout); // Modify the registration for a reactable with given reactable void ModifyRegistration(Reactable* reactable, Closure on_read_ready, Closure on_write_ready); Loading @@ -71,6 +75,7 @@ class Reactor { int control_fd_; std::atomic<bool> is_running_; std::list<Reactable*> invalidation_list_; std::unique_ptr<std::future<void>> executing_reactable_finished_; }; } // namespace os Loading Loading
system/gd/os/linux_generic/reactor.cc +19 −5 Original line number Diff line number Diff line Loading @@ -50,6 +50,7 @@ class Reactor::Reactable { bool is_executing_; bool removed_; std::mutex mutex_; std::unique_ptr<std::promise<void>> finished_promise_; }; Reactor::Reactor() Loading Loading @@ -81,8 +82,8 @@ Reactor::~Reactor() { } void Reactor::Run() { bool previously_running = is_running_.exchange(true); ASSERT(!previously_running); bool already_running = is_running_.exchange(true); ASSERT(!already_running); for (;;) { { Loading Loading @@ -113,7 +114,7 @@ void Reactor::Run() { } { std::unique_lock<std::mutex> reactable_lock(reactable->mutex_); std::lock_guard<std::mutex> reactable_lock(reactable->mutex_); lock.unlock(); reactable->is_executing_ = true; } Loading @@ -123,9 +124,12 @@ void Reactor::Run() { if (event.events & EPOLLOUT && reactable->on_write_ready_ != nullptr) { reactable->on_write_ready_(); } { std::lock_guard<std::mutex> reactable_lock(reactable->mutex_); reactable->is_executing_ = false; } if (reactable->removed_) { reactable->finished_promise_->set_value(); delete reactable; } } Loading Loading @@ -180,6 +184,8 @@ void Reactor::Unregister(Reactor::Reactable* reactable) { // executed. reactable->is_executing_ is protected by reactable->mutex_, so it's thread safe. if (reactable->is_executing_) { reactable->removed_ = true; reactable->finished_promise_ = std::make_unique<std::promise<void>>(); executing_reactable_finished_ = std::make_unique<std::future<void>>(reactable->finished_promise_->get_future()); delaying_delete_until_callback_finished = true; } } Loading @@ -189,6 +195,14 @@ void Reactor::Unregister(Reactor::Reactable* reactable) { } } bool Reactor::WaitForUnregisteredReactable(std::chrono::milliseconds timeout) { if (executing_reactable_finished_ == nullptr) { return true; } auto stop_status = executing_reactable_finished_->wait_for(timeout); return stop_status == std::future_status::ready; } void Reactor::ModifyRegistration(Reactor::Reactable* reactable, Closure on_read_ready, Closure on_write_ready) { ASSERT(reactable != nullptr); Loading
system/gd/os/linux_generic/reactor_unittest.cc +34 −0 Original line number Diff line number Diff line Loading @@ -229,6 +229,40 @@ TEST_F(ReactorTest, unregister_from_different_thread_while_task_is_executing_) { reactor_thread.join(); } TEST_F(ReactorTest, unregister_from_different_thread_while_task_is_executing_wait_fails) { FakeRunningReactable fake_reactable; auto* reactable = reactor_->Register(fake_reactable.fd_, std::bind(&FakeRunningReactable::OnReadReady, &fake_reactable), nullptr); auto reactor_thread = std::thread(&Reactor::Run, reactor_); auto write_result = eventfd_write(fake_reactable.fd_, 1); ASSERT_EQ(write_result, 0); fake_reactable.started.get_future().wait(); reactor_->Unregister(reactable); ASSERT_FALSE(reactor_->WaitForUnregisteredReactable(std::chrono::milliseconds(1))); fake_reactable.can_finish.set_value(); fake_reactable.finished.get_future().wait(); reactor_->Stop(); reactor_thread.join(); } TEST_F(ReactorTest, unregister_from_different_thread_while_task_is_executing_wait_succeeds) { FakeRunningReactable fake_reactable; auto* reactable = reactor_->Register(fake_reactable.fd_, std::bind(&FakeRunningReactable::OnReadReady, &fake_reactable), nullptr); auto reactor_thread = std::thread(&Reactor::Run, reactor_); auto write_result = eventfd_write(fake_reactable.fd_, 1); ASSERT_EQ(write_result, 0); fake_reactable.started.get_future().wait(); reactor_->Unregister(reactable); fake_reactable.can_finish.set_value(); fake_reactable.finished.get_future().wait(); ASSERT_TRUE(reactor_->WaitForUnregisteredReactable(std::chrono::milliseconds(1))); reactor_->Stop(); reactor_thread.join(); } TEST_F(ReactorTest, hot_unregister_from_different_thread) { FakeReactable fake_reactable; auto* reactable = Loading
system/gd/os/reactor.h +5 −0 Original line number Diff line number Diff line Loading @@ -19,6 +19,7 @@ #include <sys/epoll.h> #include <atomic> #include <functional> #include <future> #include <list> #include <mutex> #include <thread> Loading Loading @@ -62,6 +63,9 @@ class Reactor { // Unregister a reactable from this reactor void Unregister(Reactable* reactable); // Wait for up to timeout milliseconds, and return true if the reactable finished executing. bool WaitForUnregisteredReactable(std::chrono::milliseconds timeout); // Modify the registration for a reactable with given reactable void ModifyRegistration(Reactable* reactable, Closure on_read_ready, Closure on_write_ready); Loading @@ -71,6 +75,7 @@ class Reactor { int control_fd_; std::atomic<bool> is_running_; std::list<Reactable*> invalidation_list_; std::unique_ptr<std::future<void>> executing_reactable_finished_; }; } // namespace os Loading