Loading system/gd/hal/hci_hal_host.cc +11 −13 Original line number Diff line number Diff line Loading @@ -278,7 +278,10 @@ class HciHalHost : public HciHal { sock_fd_ = ConnectToSocket(); ASSERT(sock_fd_ != INVALID_FD); reactable_ = hci_incoming_thread_.GetReactor()->Register( sock_fd_, common::Bind(&HciHalHost::incoming_packet_received, common::Unretained(this)), common::Closure()); sock_fd_, common::Bind(&HciHalHost::incoming_packet_received, common::Unretained(this)), common::Bind(&HciHalHost::send_packet_ready, common::Unretained(this))); hci_incoming_thread_.GetReactor()->ModifyRegistration(reactable_, os::Reactor::REACT_ON_READ_ONLY); btsnoop_logger_ = GetDependency<SnoopLogger>(); LOG_INFO("HAL opened successfully"); } Loading Loading @@ -324,26 +327,21 @@ class HciHalHost : public HciHal { // TODO: replace this with new queue when it's ready hci_outgoing_queue_.emplace(packet); if (hci_outgoing_queue_.size() == 1) { hci_incoming_thread_.GetReactor()->ModifyRegistration( reactable_, common::Bind(&HciHalHost::incoming_packet_received, common::Unretained(this)), common::Bind(&HciHalHost::send_packet_ready, common::Unretained(this))); hci_incoming_thread_.GetReactor()->ModifyRegistration(reactable_, os::Reactor::REACT_ON_READ_WRITE); } } void send_packet_ready() { std::lock_guard<std::mutex> lock(this->api_mutex_); auto packet_to_send = this->hci_outgoing_queue_.front(); auto bytes_written = write(this->sock_fd_, (void*)packet_to_send.data(), packet_to_send.size()); this->hci_outgoing_queue_.pop(); std::lock_guard<std::mutex> lock(api_mutex_); if (hci_outgoing_queue_.empty()) return; auto packet_to_send = hci_outgoing_queue_.front(); auto bytes_written = write(sock_fd_, (void*)packet_to_send.data(), packet_to_send.size()); hci_outgoing_queue_.pop(); if (bytes_written == -1) { abort(); } if (hci_outgoing_queue_.empty()) { this->hci_incoming_thread_.GetReactor()->ModifyRegistration( this->reactable_, common::Bind(&HciHalHost::incoming_packet_received, common::Unretained(this)), common::Closure()); hci_incoming_thread_.GetReactor()->ModifyRegistration(reactable_, os::Reactor::REACT_ON_READ_ONLY); } } Loading system/gd/hal/hci_hal_host_rootcanal.cc +11 −13 Original line number Diff line number Diff line Loading @@ -168,7 +168,10 @@ class HciHalHost : public HciHal { sock_fd_ = ConnectToSocket(); ASSERT(sock_fd_ != INVALID_FD); reactable_ = hci_incoming_thread_.GetReactor()->Register( sock_fd_, common::Bind(&HciHalHost::incoming_packet_received, common::Unretained(this)), common::Closure()); sock_fd_, common::Bind(&HciHalHost::incoming_packet_received, common::Unretained(this)), common::Bind(&HciHalHost::send_packet_ready, common::Unretained(this))); hci_incoming_thread_.GetReactor()->ModifyRegistration(reactable_, os::Reactor::REACT_ON_READ_ONLY); btsnoop_logger_ = GetDependency<SnoopLogger>(); LOG_INFO("HAL opened successfully"); } Loading Loading @@ -214,26 +217,21 @@ class HciHalHost : public HciHal { // TODO: replace this with new queue when it's ready hci_outgoing_queue_.emplace(packet); if (hci_outgoing_queue_.size() == 1) { hci_incoming_thread_.GetReactor()->ModifyRegistration( reactable_, common::Bind(&HciHalHost::incoming_packet_received, common::Unretained(this)), common::Bind(&HciHalHost::send_packet_ready, common::Unretained(this))); hci_incoming_thread_.GetReactor()->ModifyRegistration(reactable_, os::Reactor::REACT_ON_READ_WRITE); } } void send_packet_ready() { std::lock_guard<std::mutex> lock(this->api_mutex_); auto packet_to_send = this->hci_outgoing_queue_.front(); auto bytes_written = write(this->sock_fd_, (void*)packet_to_send.data(), packet_to_send.size()); this->hci_outgoing_queue_.pop(); std::lock_guard<std::mutex> lock(api_mutex_); if (hci_outgoing_queue_.empty()) return; auto packet_to_send = hci_outgoing_queue_.front(); auto bytes_written = write(sock_fd_, (void*)packet_to_send.data(), packet_to_send.size()); hci_outgoing_queue_.pop(); if (bytes_written == -1) { abort(); } if (hci_outgoing_queue_.empty()) { this->hci_incoming_thread_.GetReactor()->ModifyRegistration( this->reactable_, common::Bind(&HciHalHost::incoming_packet_received, common::Unretained(this)), common::Closure()); hci_incoming_thread_.GetReactor()->ModifyRegistration(reactable_, os::Reactor::REACT_ON_READ_ONLY); } } Loading system/gd/os/linux_generic/reactor.cc +3 −8 Original line number Diff line number Diff line Loading @@ -288,21 +288,16 @@ bool Reactor::WaitForIdle(std::chrono::milliseconds timeout) { return idle_status == std::future_status::ready; } void Reactor::ModifyRegistration(Reactor::Reactable* reactable, Closure on_read_ready, Closure on_write_ready) { void Reactor::ModifyRegistration(Reactor::Reactable* reactable, ReactOn react_on) { ASSERT(reactable != nullptr); uint32_t poll_event_type = 0; if (!on_read_ready.is_null()) { if (react_on == REACT_ON_READ_ONLY || react_on == REACT_ON_READ_WRITE) { poll_event_type |= (EPOLLIN | EPOLLRDHUP); } if (!on_write_ready.is_null()) { if (react_on == REACT_ON_WRITE_ONLY || react_on == REACT_ON_READ_WRITE) { poll_event_type |= EPOLLOUT; } { std::lock_guard<std::mutex> reactable_lock(reactable->mutex_); reactable->on_read_ready_ = std::move(on_read_ready); reactable->on_write_ready_ = std::move(on_write_ready); } epoll_event event = { .events = poll_event_type, .data = {.ptr = reactable}, Loading system/gd/os/linux_generic/reactor_unittest.cc +23 −7 Original line number Diff line number Diff line Loading @@ -417,14 +417,30 @@ TEST_F(ReactorTest, on_write_ready) { TEST_F(ReactorTest, modify_registration) { FakeReactable fake_reactable; auto* reactable = reactor_->Register( fake_reactable.fd_, Bind(&FakeReactable::OnReadReady, common::Unretained(&fake_reactable)), common::Closure()); reactor_->ModifyRegistration( reactable, common::Closure(), Bind(&FakeReactable::OnWriteReady, common::Unretained(&fake_reactable))); fake_reactable.fd_, Bind(&FakeReactable::OnReadReady, common::Unretained(&fake_reactable)), Bind(&FakeReactable::OnWriteReady, common::Unretained(&fake_reactable))); auto reactor_thread = std::thread(&Reactor::Run, reactor_); uint64_t value = 0; auto read_result = eventfd_read(fake_reactable.fd_, &value); EXPECT_EQ(read_result, 0); EXPECT_EQ(value, FakeReactable::kSampleOutputValue); using namespace std::chrono_literals; auto future = g_promise->get_future(); auto write_result = eventfd_write(fake_reactable.fd_, FakeReactable::kSetPromise); ASSERT_EQ(write_result, 0); ASSERT_EQ(future.wait_for(10ms), std::future_status::ready); ASSERT_EQ(future.get(), kReadReadyValue); /* Disable on_read callback */ reactor_->ModifyRegistration(reactable, Reactor::REACT_ON_WRITE_ONLY); delete g_promise; g_promise = new std::promise<int>; future = g_promise->get_future(); write_result = eventfd_write(fake_reactable.fd_, FakeReactable::kSetPromise); ASSERT_EQ(write_result, 0); ASSERT_NE(future.wait_for(10ms), std::future_status::ready); reactor_->Stop(); reactor_thread.join(); Loading system/gd/os/reactor.h +8 −2 Original line number Diff line number Diff line Loading @@ -70,8 +70,14 @@ class Reactor { // Wait for up to timeout milliseconds, and return true if we reached idle. bool WaitForIdle(std::chrono::milliseconds timeout); // Modify the registration for a reactable with given reactable void ModifyRegistration(Reactable* reactable, common::Closure on_read_ready, common::Closure on_write_ready); enum ReactOn { REACT_ON_READ_ONLY, REACT_ON_WRITE_ONLY, REACT_ON_READ_WRITE, }; // Modify subscribed poll events on the fly void ModifyRegistration(Reactable* reactable, ReactOn react_on); class Event { public: Loading Loading
system/gd/hal/hci_hal_host.cc +11 −13 Original line number Diff line number Diff line Loading @@ -278,7 +278,10 @@ class HciHalHost : public HciHal { sock_fd_ = ConnectToSocket(); ASSERT(sock_fd_ != INVALID_FD); reactable_ = hci_incoming_thread_.GetReactor()->Register( sock_fd_, common::Bind(&HciHalHost::incoming_packet_received, common::Unretained(this)), common::Closure()); sock_fd_, common::Bind(&HciHalHost::incoming_packet_received, common::Unretained(this)), common::Bind(&HciHalHost::send_packet_ready, common::Unretained(this))); hci_incoming_thread_.GetReactor()->ModifyRegistration(reactable_, os::Reactor::REACT_ON_READ_ONLY); btsnoop_logger_ = GetDependency<SnoopLogger>(); LOG_INFO("HAL opened successfully"); } Loading Loading @@ -324,26 +327,21 @@ class HciHalHost : public HciHal { // TODO: replace this with new queue when it's ready hci_outgoing_queue_.emplace(packet); if (hci_outgoing_queue_.size() == 1) { hci_incoming_thread_.GetReactor()->ModifyRegistration( reactable_, common::Bind(&HciHalHost::incoming_packet_received, common::Unretained(this)), common::Bind(&HciHalHost::send_packet_ready, common::Unretained(this))); hci_incoming_thread_.GetReactor()->ModifyRegistration(reactable_, os::Reactor::REACT_ON_READ_WRITE); } } void send_packet_ready() { std::lock_guard<std::mutex> lock(this->api_mutex_); auto packet_to_send = this->hci_outgoing_queue_.front(); auto bytes_written = write(this->sock_fd_, (void*)packet_to_send.data(), packet_to_send.size()); this->hci_outgoing_queue_.pop(); std::lock_guard<std::mutex> lock(api_mutex_); if (hci_outgoing_queue_.empty()) return; auto packet_to_send = hci_outgoing_queue_.front(); auto bytes_written = write(sock_fd_, (void*)packet_to_send.data(), packet_to_send.size()); hci_outgoing_queue_.pop(); if (bytes_written == -1) { abort(); } if (hci_outgoing_queue_.empty()) { this->hci_incoming_thread_.GetReactor()->ModifyRegistration( this->reactable_, common::Bind(&HciHalHost::incoming_packet_received, common::Unretained(this)), common::Closure()); hci_incoming_thread_.GetReactor()->ModifyRegistration(reactable_, os::Reactor::REACT_ON_READ_ONLY); } } Loading
system/gd/hal/hci_hal_host_rootcanal.cc +11 −13 Original line number Diff line number Diff line Loading @@ -168,7 +168,10 @@ class HciHalHost : public HciHal { sock_fd_ = ConnectToSocket(); ASSERT(sock_fd_ != INVALID_FD); reactable_ = hci_incoming_thread_.GetReactor()->Register( sock_fd_, common::Bind(&HciHalHost::incoming_packet_received, common::Unretained(this)), common::Closure()); sock_fd_, common::Bind(&HciHalHost::incoming_packet_received, common::Unretained(this)), common::Bind(&HciHalHost::send_packet_ready, common::Unretained(this))); hci_incoming_thread_.GetReactor()->ModifyRegistration(reactable_, os::Reactor::REACT_ON_READ_ONLY); btsnoop_logger_ = GetDependency<SnoopLogger>(); LOG_INFO("HAL opened successfully"); } Loading Loading @@ -214,26 +217,21 @@ class HciHalHost : public HciHal { // TODO: replace this with new queue when it's ready hci_outgoing_queue_.emplace(packet); if (hci_outgoing_queue_.size() == 1) { hci_incoming_thread_.GetReactor()->ModifyRegistration( reactable_, common::Bind(&HciHalHost::incoming_packet_received, common::Unretained(this)), common::Bind(&HciHalHost::send_packet_ready, common::Unretained(this))); hci_incoming_thread_.GetReactor()->ModifyRegistration(reactable_, os::Reactor::REACT_ON_READ_WRITE); } } void send_packet_ready() { std::lock_guard<std::mutex> lock(this->api_mutex_); auto packet_to_send = this->hci_outgoing_queue_.front(); auto bytes_written = write(this->sock_fd_, (void*)packet_to_send.data(), packet_to_send.size()); this->hci_outgoing_queue_.pop(); std::lock_guard<std::mutex> lock(api_mutex_); if (hci_outgoing_queue_.empty()) return; auto packet_to_send = hci_outgoing_queue_.front(); auto bytes_written = write(sock_fd_, (void*)packet_to_send.data(), packet_to_send.size()); hci_outgoing_queue_.pop(); if (bytes_written == -1) { abort(); } if (hci_outgoing_queue_.empty()) { this->hci_incoming_thread_.GetReactor()->ModifyRegistration( this->reactable_, common::Bind(&HciHalHost::incoming_packet_received, common::Unretained(this)), common::Closure()); hci_incoming_thread_.GetReactor()->ModifyRegistration(reactable_, os::Reactor::REACT_ON_READ_ONLY); } } Loading
system/gd/os/linux_generic/reactor.cc +3 −8 Original line number Diff line number Diff line Loading @@ -288,21 +288,16 @@ bool Reactor::WaitForIdle(std::chrono::milliseconds timeout) { return idle_status == std::future_status::ready; } void Reactor::ModifyRegistration(Reactor::Reactable* reactable, Closure on_read_ready, Closure on_write_ready) { void Reactor::ModifyRegistration(Reactor::Reactable* reactable, ReactOn react_on) { ASSERT(reactable != nullptr); uint32_t poll_event_type = 0; if (!on_read_ready.is_null()) { if (react_on == REACT_ON_READ_ONLY || react_on == REACT_ON_READ_WRITE) { poll_event_type |= (EPOLLIN | EPOLLRDHUP); } if (!on_write_ready.is_null()) { if (react_on == REACT_ON_WRITE_ONLY || react_on == REACT_ON_READ_WRITE) { poll_event_type |= EPOLLOUT; } { std::lock_guard<std::mutex> reactable_lock(reactable->mutex_); reactable->on_read_ready_ = std::move(on_read_ready); reactable->on_write_ready_ = std::move(on_write_ready); } epoll_event event = { .events = poll_event_type, .data = {.ptr = reactable}, Loading
system/gd/os/linux_generic/reactor_unittest.cc +23 −7 Original line number Diff line number Diff line Loading @@ -417,14 +417,30 @@ TEST_F(ReactorTest, on_write_ready) { TEST_F(ReactorTest, modify_registration) { FakeReactable fake_reactable; auto* reactable = reactor_->Register( fake_reactable.fd_, Bind(&FakeReactable::OnReadReady, common::Unretained(&fake_reactable)), common::Closure()); reactor_->ModifyRegistration( reactable, common::Closure(), Bind(&FakeReactable::OnWriteReady, common::Unretained(&fake_reactable))); fake_reactable.fd_, Bind(&FakeReactable::OnReadReady, common::Unretained(&fake_reactable)), Bind(&FakeReactable::OnWriteReady, common::Unretained(&fake_reactable))); auto reactor_thread = std::thread(&Reactor::Run, reactor_); uint64_t value = 0; auto read_result = eventfd_read(fake_reactable.fd_, &value); EXPECT_EQ(read_result, 0); EXPECT_EQ(value, FakeReactable::kSampleOutputValue); using namespace std::chrono_literals; auto future = g_promise->get_future(); auto write_result = eventfd_write(fake_reactable.fd_, FakeReactable::kSetPromise); ASSERT_EQ(write_result, 0); ASSERT_EQ(future.wait_for(10ms), std::future_status::ready); ASSERT_EQ(future.get(), kReadReadyValue); /* Disable on_read callback */ reactor_->ModifyRegistration(reactable, Reactor::REACT_ON_WRITE_ONLY); delete g_promise; g_promise = new std::promise<int>; future = g_promise->get_future(); write_result = eventfd_write(fake_reactable.fd_, FakeReactable::kSetPromise); ASSERT_EQ(write_result, 0); ASSERT_NE(future.wait_for(10ms), std::future_status::ready); reactor_->Stop(); reactor_thread.join(); Loading
system/gd/os/reactor.h +8 −2 Original line number Diff line number Diff line Loading @@ -70,8 +70,14 @@ class Reactor { // Wait for up to timeout milliseconds, and return true if we reached idle. bool WaitForIdle(std::chrono::milliseconds timeout); // Modify the registration for a reactable with given reactable void ModifyRegistration(Reactable* reactable, common::Closure on_read_ready, common::Closure on_write_ready); enum ReactOn { REACT_ON_READ_ONLY, REACT_ON_WRITE_ONLY, REACT_ON_READ_WRITE, }; // Modify subscribed poll events on the fly void ModifyRegistration(Reactable* reactable, ReactOn react_on); class Event { public: Loading