Loading system/gd/os/linux_generic/reactor.cc +26 −26 Original line number Diff line number Diff line Loading @@ -42,19 +42,20 @@ class Reactor::Reactable { : fd_(fd), on_read_ready_(std::move(on_read_ready)), on_write_ready_(std::move(on_write_ready)), is_executing_(false) {} is_executing_(false), removed_(false) {} const int fd_; Closure on_read_ready_; Closure on_write_ready_; bool is_executing_; std::recursive_mutex lock_; bool removed_; std::mutex mutex_; }; Reactor::Reactor() : epoll_fd_(0), control_fd_(0), is_running_(false), reactable_removed_(false) { is_running_(false) { RUN_NO_INTR(epoll_fd_ = epoll_create1(EPOLL_CLOEXEC)); ASSERT_LOG(epoll_fd_ != -1, "could not create epoll fd: %s", strerror(errno)); Loading Loading @@ -105,28 +106,27 @@ void Reactor::Run() { return; } auto* reactable = static_cast<Reactor::Reactable*>(event.data.ptr); { std::unique_lock<std::mutex> lock(mutex_); // See if this reactable has been removed in the meantime. if (std::find(invalidation_list_.begin(), invalidation_list_.end(), reactable) != invalidation_list_.end()) { continue; } std::lock_guard<std::recursive_mutex> reactable_lock(reactable->lock_); { std::unique_lock<std::mutex> reactable_lock(reactable->mutex_); lock.unlock(); reactable_removed_ = false; reactable->is_executing_ = true; } if (event.events & (EPOLLIN | EPOLLHUP | EPOLLRDHUP | EPOLLERR) && reactable->on_read_ready_ != nullptr) { reactable->on_read_ready_(); } if (!reactable_removed_ && event.events & EPOLLOUT && reactable->on_write_ready_ != nullptr) { if (event.events & EPOLLOUT && reactable->on_write_ready_ != nullptr) { reactable->on_write_ready_(); } std::lock_guard<std::mutex> reactable_lock(reactable->mutex_); reactable->is_executing_ = false; } if (reactable_removed_) { if (reactable->removed_) { delete reactable; reactable_removed_ = false; } } } Loading Loading @@ -168,7 +168,7 @@ void Reactor::Unregister(Reactor::Reactable* reactable) { bool delaying_delete_until_callback_finished = false; { int result; std::lock_guard<std::recursive_mutex> reactable_lock(reactable->lock_); std::lock_guard<std::mutex> reactable_lock(reactable->mutex_); RUN_NO_INTR(result = epoll_ctl(epoll_fd_, EPOLL_CTL_DEL, reactable->fd_, nullptr)); if (result == -1 && errno == ENOENT) { LOG_INFO("reactable is invalid or unregistered"); Loading @@ -176,10 +176,10 @@ void Reactor::Unregister(Reactor::Reactable* reactable) { ASSERT(result != -1); } // If we are unregistering during the callback event from this reactable, we delete it after the callback is executed. // reactable->is_executing_ is protected by reactable->lock_, so it's thread safe. // If we are unregistering during the callback event from this reactable, we delete it after the callback is // executed. reactable->is_executing_ is protected by reactable->mutex_, so it's thread safe. if (reactable->is_executing_) { reactable_removed_ = true; reactable->removed_ = true; delaying_delete_until_callback_finished = true; } } Loading @@ -200,7 +200,7 @@ void Reactor::ModifyRegistration(Reactor::Reactable* reactable, Closure on_read_ poll_event_type |= EPOLLOUT; } { std::lock_guard<std::recursive_mutex> reactable_lock(reactable->lock_); std::lock_guard<std::mutex> reactable_lock(reactable->mutex_); reactable->on_read_ready_ = std::move(on_read_ready); reactable->on_write_ready_ = std::move(on_write_ready); } Loading system/gd/os/linux_generic/reactor_unittest.cc +43 −0 Original line number Diff line number Diff line Loading @@ -126,6 +126,33 @@ class FakeReactable { uint64_t output_data_ = kSampleOutputValue; }; class FakeRunningReactable { public: FakeRunningReactable() : fd_(eventfd(0, 0)) { EXPECT_NE(fd_, -1); } ~FakeRunningReactable() { close(fd_); } void OnReadReady() { uint64_t value = 0; auto read_result = eventfd_read(fd_, &value); ASSERT_EQ(read_result, 0); started.set_value(); can_finish.get_future().wait(); finished.set_value(); } Reactor::Reactable* reactable_ = nullptr; int fd_; std::promise<void> started; std::promise<void> can_finish; std::promise<void> finished; }; TEST_F(ReactorTest, start_and_stop) { auto reactor_thread = std::thread(&Reactor::Run, reactor_); reactor_->Stop(); Loading Loading @@ -186,6 +213,22 @@ TEST_F(ReactorTest, hot_register_from_different_thread) { reactor_->Unregister(reactable); } TEST_F(ReactorTest, unregister_from_different_thread_while_task_is_executing_) { FakeRunningReactable fake_reactable; auto* reactable = reactor_->Register(fake_reactable.fd_, std::bind(&FakeRunningReactable::OnReadReady, &fake_reactable), nullptr); auto reactor_thread = std::thread(&Reactor::Run, reactor_); auto write_result = eventfd_write(fake_reactable.fd_, 1); ASSERT_EQ(write_result, 0); fake_reactable.started.get_future().wait(); reactor_->Unregister(reactable); fake_reactable.can_finish.set_value(); fake_reactable.finished.get_future().wait(); reactor_->Stop(); reactor_thread.join(); } TEST_F(ReactorTest, hot_unregister_from_different_thread) { FakeReactable fake_reactable; auto* reactable = Loading system/gd/os/reactor.h +0 −1 Original line number Diff line number Diff line Loading @@ -71,7 +71,6 @@ class Reactor { int control_fd_; std::atomic<bool> is_running_; std::list<Reactable*> invalidation_list_; bool reactable_removed_; }; } // namespace os Loading Loading
system/gd/os/linux_generic/reactor.cc +26 −26 Original line number Diff line number Diff line Loading @@ -42,19 +42,20 @@ class Reactor::Reactable { : fd_(fd), on_read_ready_(std::move(on_read_ready)), on_write_ready_(std::move(on_write_ready)), is_executing_(false) {} is_executing_(false), removed_(false) {} const int fd_; Closure on_read_ready_; Closure on_write_ready_; bool is_executing_; std::recursive_mutex lock_; bool removed_; std::mutex mutex_; }; Reactor::Reactor() : epoll_fd_(0), control_fd_(0), is_running_(false), reactable_removed_(false) { is_running_(false) { RUN_NO_INTR(epoll_fd_ = epoll_create1(EPOLL_CLOEXEC)); ASSERT_LOG(epoll_fd_ != -1, "could not create epoll fd: %s", strerror(errno)); Loading Loading @@ -105,28 +106,27 @@ void Reactor::Run() { return; } auto* reactable = static_cast<Reactor::Reactable*>(event.data.ptr); { std::unique_lock<std::mutex> lock(mutex_); // See if this reactable has been removed in the meantime. if (std::find(invalidation_list_.begin(), invalidation_list_.end(), reactable) != invalidation_list_.end()) { continue; } std::lock_guard<std::recursive_mutex> reactable_lock(reactable->lock_); { std::unique_lock<std::mutex> reactable_lock(reactable->mutex_); lock.unlock(); reactable_removed_ = false; reactable->is_executing_ = true; } if (event.events & (EPOLLIN | EPOLLHUP | EPOLLRDHUP | EPOLLERR) && reactable->on_read_ready_ != nullptr) { reactable->on_read_ready_(); } if (!reactable_removed_ && event.events & EPOLLOUT && reactable->on_write_ready_ != nullptr) { if (event.events & EPOLLOUT && reactable->on_write_ready_ != nullptr) { reactable->on_write_ready_(); } std::lock_guard<std::mutex> reactable_lock(reactable->mutex_); reactable->is_executing_ = false; } if (reactable_removed_) { if (reactable->removed_) { delete reactable; reactable_removed_ = false; } } } Loading Loading @@ -168,7 +168,7 @@ void Reactor::Unregister(Reactor::Reactable* reactable) { bool delaying_delete_until_callback_finished = false; { int result; std::lock_guard<std::recursive_mutex> reactable_lock(reactable->lock_); std::lock_guard<std::mutex> reactable_lock(reactable->mutex_); RUN_NO_INTR(result = epoll_ctl(epoll_fd_, EPOLL_CTL_DEL, reactable->fd_, nullptr)); if (result == -1 && errno == ENOENT) { LOG_INFO("reactable is invalid or unregistered"); Loading @@ -176,10 +176,10 @@ void Reactor::Unregister(Reactor::Reactable* reactable) { ASSERT(result != -1); } // If we are unregistering during the callback event from this reactable, we delete it after the callback is executed. // reactable->is_executing_ is protected by reactable->lock_, so it's thread safe. // If we are unregistering during the callback event from this reactable, we delete it after the callback is // executed. reactable->is_executing_ is protected by reactable->mutex_, so it's thread safe. if (reactable->is_executing_) { reactable_removed_ = true; reactable->removed_ = true; delaying_delete_until_callback_finished = true; } } Loading @@ -200,7 +200,7 @@ void Reactor::ModifyRegistration(Reactor::Reactable* reactable, Closure on_read_ poll_event_type |= EPOLLOUT; } { std::lock_guard<std::recursive_mutex> reactable_lock(reactable->lock_); std::lock_guard<std::mutex> reactable_lock(reactable->mutex_); reactable->on_read_ready_ = std::move(on_read_ready); reactable->on_write_ready_ = std::move(on_write_ready); } Loading
system/gd/os/linux_generic/reactor_unittest.cc +43 −0 Original line number Diff line number Diff line Loading @@ -126,6 +126,33 @@ class FakeReactable { uint64_t output_data_ = kSampleOutputValue; }; class FakeRunningReactable { public: FakeRunningReactable() : fd_(eventfd(0, 0)) { EXPECT_NE(fd_, -1); } ~FakeRunningReactable() { close(fd_); } void OnReadReady() { uint64_t value = 0; auto read_result = eventfd_read(fd_, &value); ASSERT_EQ(read_result, 0); started.set_value(); can_finish.get_future().wait(); finished.set_value(); } Reactor::Reactable* reactable_ = nullptr; int fd_; std::promise<void> started; std::promise<void> can_finish; std::promise<void> finished; }; TEST_F(ReactorTest, start_and_stop) { auto reactor_thread = std::thread(&Reactor::Run, reactor_); reactor_->Stop(); Loading Loading @@ -186,6 +213,22 @@ TEST_F(ReactorTest, hot_register_from_different_thread) { reactor_->Unregister(reactable); } TEST_F(ReactorTest, unregister_from_different_thread_while_task_is_executing_) { FakeRunningReactable fake_reactable; auto* reactable = reactor_->Register(fake_reactable.fd_, std::bind(&FakeRunningReactable::OnReadReady, &fake_reactable), nullptr); auto reactor_thread = std::thread(&Reactor::Run, reactor_); auto write_result = eventfd_write(fake_reactable.fd_, 1); ASSERT_EQ(write_result, 0); fake_reactable.started.get_future().wait(); reactor_->Unregister(reactable); fake_reactable.can_finish.set_value(); fake_reactable.finished.get_future().wait(); reactor_->Stop(); reactor_thread.join(); } TEST_F(ReactorTest, hot_unregister_from_different_thread) { FakeReactable fake_reactable; auto* reactable = Loading
system/gd/os/reactor.h +0 −1 Original line number Diff line number Diff line Loading @@ -71,7 +71,6 @@ class Reactor { int control_fd_; std::atomic<bool> is_running_; std::list<Reactable*> invalidation_list_; bool reactable_removed_; }; } // namespace os Loading