Loading adb/fdevent/fdevent.cpp +26 −0 Original line number Diff line number Diff line Loading @@ -49,6 +49,32 @@ std::string dump_fde(const fdevent* fde) { state.c_str()); } void fdevent_context::Run(std::function<void()> fn) { { std::lock_guard<std::mutex> lock(run_queue_mutex_); run_queue_.push_back(std::move(fn)); } Interrupt(); } void fdevent_context::FlushRunQueue() { // We need to be careful around reentrancy here, since a function we call can queue up another // function. while (true) { std::function<void()> fn; { std::lock_guard<std::mutex> lock(this->run_queue_mutex_); if (this->run_queue_.empty()) { break; } fn = this->run_queue_.front(); this->run_queue_.pop_front(); } fn(); } } static auto& g_ambient_fdevent_context = *new std::unique_ptr<fdevent_context>(new fdevent_context_poll()); Loading adb/fdevent/fdevent.h +18 −1 Original line number Diff line number Diff line Loading @@ -21,10 +21,14 @@ #include <stdint.h> #include <chrono> #include <deque> #include <functional> #include <mutex> #include <optional> #include <variant> #include <android-base/thread_annotations.h> #include "adb_unique_fd.h" // Events that may be observed Loading @@ -48,6 +52,7 @@ struct fdevent; std::string dump_fde(const fdevent* fde); struct fdevent_context { public: virtual ~fdevent_context() = default; // Allocate and initialize a new fdevent object. Loading @@ -68,17 +73,29 @@ struct fdevent_context { virtual void SetTimeout(fdevent* fde, std::optional<std::chrono::milliseconds> timeout) = 0; // Loop forever, handling events. // Implementations should call FlushRunQueue on every iteration. virtual void Loop() = 0; // Assert that the caller is running on the context's main thread. virtual void CheckMainThread() = 0; // Queue an operation to be run on the main thread. virtual void Run(std::function<void()> fn) = 0; void Run(std::function<void()> fn); // Test-only functionality: virtual void TerminateLoop() = 0; virtual size_t InstalledCount() = 0; protected: // Interrupt the run loop. virtual void Interrupt() = 0; // Run all pending functions enqueued via Run(). void FlushRunQueue() EXCLUDES(run_queue_mutex_); private: std::mutex run_queue_mutex_; std::deque<std::function<void()>> run_queue_ GUARDED_BY(run_queue_mutex_); }; struct fdevent { Loading adb/fdevent/fdevent_poll.cpp +44 −79 Original line number Diff line number Diff line Loading @@ -50,6 +50,35 @@ #include "fdevent.h" #include "sysdeps/chrono.h" static void fdevent_interrupt(int fd, unsigned, void*) { char buf[BUFSIZ]; ssize_t rc = TEMP_FAILURE_RETRY(adb_read(fd, buf, sizeof(buf))); if (rc == -1) { PLOG(FATAL) << "failed to read from fdevent interrupt fd"; } } fdevent_context_poll::fdevent_context_poll() { int s[2]; if (adb_socketpair(s) != 0) { PLOG(FATAL) << "failed to create fdevent interrupt socketpair"; } if (!set_file_block_mode(s[0], false) || !set_file_block_mode(s[1], false)) { PLOG(FATAL) << "failed to make fdevent interrupt socket nonblocking"; } this->interrupt_fd_.reset(s[0]); fdevent* fde = this->Create(unique_fd(s[1]), fdevent_interrupt, nullptr); CHECK(fde != nullptr); this->Add(fde, FDE_READ); } fdevent_context_poll::~fdevent_context_poll() { main_thread_valid_ = false; this->Destroy(this->interrupt_fde_); } void fdevent_context_poll::CheckMainThread() { if (main_thread_valid_) { CHECK_EQ(main_thread_id_, android::base::GetThreadId()); Loading Loading @@ -291,79 +320,6 @@ static void fdevent_call_fdfunc(fdevent* fde) { fde->func); } static void fdevent_run_flush(fdevent_context_poll* ctx) EXCLUDES(ctx->run_queue_mutex_) { // We need to be careful around reentrancy here, since a function we call can queue up another // function. while (true) { std::function<void()> fn; { std::lock_guard<std::mutex> lock(ctx->run_queue_mutex_); if (ctx->run_queue_.empty()) { break; } fn = ctx->run_queue_.front(); ctx->run_queue_.pop_front(); } fn(); } } static void fdevent_run_func(int fd, unsigned ev, void* data) { CHECK_GE(fd, 0); CHECK(ev & FDE_READ); bool* run_needs_flush = static_cast<bool*>(data); char buf[1024]; // Empty the fd. if (adb_read(fd, buf, sizeof(buf)) == -1) { PLOG(FATAL) << "failed to empty run queue notify fd"; } // Mark that we need to flush, and then run it at the end of fdevent_loop. *run_needs_flush = true; } static void fdevent_run_setup(fdevent_context_poll* ctx) { { std::lock_guard<std::mutex> lock(ctx->run_queue_mutex_); CHECK(ctx->run_queue_notify_fd_.get() == -1); int s[2]; if (adb_socketpair(s) != 0) { PLOG(FATAL) << "failed to create run queue notify socketpair"; } if (!set_file_block_mode(s[0], false) || !set_file_block_mode(s[1], false)) { PLOG(FATAL) << "failed to make run queue notify socket nonblocking"; } ctx->run_queue_notify_fd_.reset(s[0]); fdevent* fde = ctx->Create(unique_fd(s[1]), fdevent_run_func, &ctx->run_needs_flush_); CHECK(fde != nullptr); ctx->Add(fde, FDE_READ); } fdevent_run_flush(ctx); } void fdevent_context_poll::Run(std::function<void()> fn) { std::lock_guard<std::mutex> lock(run_queue_mutex_); run_queue_.push_back(std::move(fn)); // run_queue_notify_fd could still be -1 if we're called before fdevent has finished setting up. // In that case, rely on the setup code to flush the queue without a notification being needed. if (run_queue_notify_fd_ != -1) { int rc = adb_write(run_queue_notify_fd_.get(), "", 1); // It's possible that we get EAGAIN here, if lots of notifications came in while handling. if (rc == 0) { PLOG(FATAL) << "run queue notify fd was closed?"; } else if (rc == -1 && errno != EAGAIN) { PLOG(FATAL) << "failed to write to run queue notify fd"; } } } static void fdevent_check_spin(fdevent_context_poll* ctx, uint64_t cycle) { // Check to see if we're spinning because we forgot about an fdevent // by keeping track of how long fdevents have been continuously pending. Loading Loading @@ -424,7 +380,6 @@ static void fdevent_check_spin(fdevent_context_poll* ctx, uint64_t cycle) { void fdevent_context_poll::Loop() { this->main_thread_id_ = android::base::GetThreadId(); this->main_thread_valid_ = true; fdevent_run_setup(this); uint64_t cycle = 0; while (true) { Loading @@ -444,17 +399,27 @@ void fdevent_context_poll::Loop() { fdevent_call_fdfunc(fde); } if (run_needs_flush_) { fdevent_run_flush(this); run_needs_flush_ = false; } this->FlushRunQueue(); } } void fdevent_context_poll::TerminateLoop() { terminate_loop_ = true; Interrupt(); } size_t fdevent_context_poll::InstalledCount() { return poll_node_map_.size(); // We always have an installed fde for interrupt. return poll_node_map_.size() - 1; } void fdevent_context_poll::Interrupt() { int rc = adb_write(this->interrupt_fd_, "", 1); // It's possible that we get EAGAIN here, if lots of notifications came in while handling. if (rc == 0) { PLOG(FATAL) << "fdevent interrupt fd was closed?"; } else if (rc == -1 && errno != EAGAIN) { PLOG(FATAL) << "failed to write to fdevent interrupt fd"; } } adb/fdevent/fdevent_poll.h +9 −8 Original line number Diff line number Diff line Loading @@ -25,6 +25,7 @@ #include <android-base/thread_annotations.h> #include "adb_unique_fd.h" #include "fdevent.h" struct PollNode { Loading @@ -44,7 +45,8 @@ struct PollNode { }; struct fdevent_context_poll : public fdevent_context { virtual ~fdevent_context_poll() = default; fdevent_context_poll(); virtual ~fdevent_context_poll(); virtual fdevent* Create(unique_fd fd, std::variant<fd_func, fd_func2> func, void* arg) final; virtual unique_fd Destroy(fdevent* fde) final; Loading @@ -58,11 +60,13 @@ struct fdevent_context_poll : public fdevent_context { virtual void CheckMainThread() final; virtual void Run(std::function<void()> fn) final; virtual void TerminateLoop() final; virtual size_t InstalledCount() final; protected: virtual void Interrupt() final; public: // All operations to fdevent should happen only in the main thread. // That's why we don't need a lock for fdevent. std::unordered_map<int, PollNode> poll_node_map_; Loading @@ -71,10 +75,7 @@ struct fdevent_context_poll : public fdevent_context { uint64_t main_thread_id_ = 0; uint64_t fdevent_id_ = 0; bool run_needs_flush_ = false; unique_fd run_queue_notify_fd_; std::mutex run_queue_mutex_; std::deque<std::function<void()>> run_queue_ GUARDED_BY(run_queue_mutex_); unique_fd interrupt_fd_; fdevent* interrupt_fde_ = nullptr; std::atomic<bool> terminate_loop_ = false; }; adb/fdevent/fdevent_test.h +2 −2 Original line number Diff line number Diff line Loading @@ -78,8 +78,8 @@ class FdeventTest : public ::testing::Test { } size_t GetAdditionalLocalSocketCount() { // dummy socket installed in PrepareThread() + fdevent_run_on_main_thread socket return 2; // dummy socket installed in PrepareThread() return 1; } void TerminateThread() { Loading Loading
adb/fdevent/fdevent.cpp +26 −0 Original line number Diff line number Diff line Loading @@ -49,6 +49,32 @@ std::string dump_fde(const fdevent* fde) { state.c_str()); } void fdevent_context::Run(std::function<void()> fn) { { std::lock_guard<std::mutex> lock(run_queue_mutex_); run_queue_.push_back(std::move(fn)); } Interrupt(); } void fdevent_context::FlushRunQueue() { // We need to be careful around reentrancy here, since a function we call can queue up another // function. while (true) { std::function<void()> fn; { std::lock_guard<std::mutex> lock(this->run_queue_mutex_); if (this->run_queue_.empty()) { break; } fn = this->run_queue_.front(); this->run_queue_.pop_front(); } fn(); } } static auto& g_ambient_fdevent_context = *new std::unique_ptr<fdevent_context>(new fdevent_context_poll()); Loading
adb/fdevent/fdevent.h +18 −1 Original line number Diff line number Diff line Loading @@ -21,10 +21,14 @@ #include <stdint.h> #include <chrono> #include <deque> #include <functional> #include <mutex> #include <optional> #include <variant> #include <android-base/thread_annotations.h> #include "adb_unique_fd.h" // Events that may be observed Loading @@ -48,6 +52,7 @@ struct fdevent; std::string dump_fde(const fdevent* fde); struct fdevent_context { public: virtual ~fdevent_context() = default; // Allocate and initialize a new fdevent object. Loading @@ -68,17 +73,29 @@ struct fdevent_context { virtual void SetTimeout(fdevent* fde, std::optional<std::chrono::milliseconds> timeout) = 0; // Loop forever, handling events. // Implementations should call FlushRunQueue on every iteration. virtual void Loop() = 0; // Assert that the caller is running on the context's main thread. virtual void CheckMainThread() = 0; // Queue an operation to be run on the main thread. virtual void Run(std::function<void()> fn) = 0; void Run(std::function<void()> fn); // Test-only functionality: virtual void TerminateLoop() = 0; virtual size_t InstalledCount() = 0; protected: // Interrupt the run loop. virtual void Interrupt() = 0; // Run all pending functions enqueued via Run(). void FlushRunQueue() EXCLUDES(run_queue_mutex_); private: std::mutex run_queue_mutex_; std::deque<std::function<void()>> run_queue_ GUARDED_BY(run_queue_mutex_); }; struct fdevent { Loading
adb/fdevent/fdevent_poll.cpp +44 −79 Original line number Diff line number Diff line Loading @@ -50,6 +50,35 @@ #include "fdevent.h" #include "sysdeps/chrono.h" static void fdevent_interrupt(int fd, unsigned, void*) { char buf[BUFSIZ]; ssize_t rc = TEMP_FAILURE_RETRY(adb_read(fd, buf, sizeof(buf))); if (rc == -1) { PLOG(FATAL) << "failed to read from fdevent interrupt fd"; } } fdevent_context_poll::fdevent_context_poll() { int s[2]; if (adb_socketpair(s) != 0) { PLOG(FATAL) << "failed to create fdevent interrupt socketpair"; } if (!set_file_block_mode(s[0], false) || !set_file_block_mode(s[1], false)) { PLOG(FATAL) << "failed to make fdevent interrupt socket nonblocking"; } this->interrupt_fd_.reset(s[0]); fdevent* fde = this->Create(unique_fd(s[1]), fdevent_interrupt, nullptr); CHECK(fde != nullptr); this->Add(fde, FDE_READ); } fdevent_context_poll::~fdevent_context_poll() { main_thread_valid_ = false; this->Destroy(this->interrupt_fde_); } void fdevent_context_poll::CheckMainThread() { if (main_thread_valid_) { CHECK_EQ(main_thread_id_, android::base::GetThreadId()); Loading Loading @@ -291,79 +320,6 @@ static void fdevent_call_fdfunc(fdevent* fde) { fde->func); } static void fdevent_run_flush(fdevent_context_poll* ctx) EXCLUDES(ctx->run_queue_mutex_) { // We need to be careful around reentrancy here, since a function we call can queue up another // function. while (true) { std::function<void()> fn; { std::lock_guard<std::mutex> lock(ctx->run_queue_mutex_); if (ctx->run_queue_.empty()) { break; } fn = ctx->run_queue_.front(); ctx->run_queue_.pop_front(); } fn(); } } static void fdevent_run_func(int fd, unsigned ev, void* data) { CHECK_GE(fd, 0); CHECK(ev & FDE_READ); bool* run_needs_flush = static_cast<bool*>(data); char buf[1024]; // Empty the fd. if (adb_read(fd, buf, sizeof(buf)) == -1) { PLOG(FATAL) << "failed to empty run queue notify fd"; } // Mark that we need to flush, and then run it at the end of fdevent_loop. *run_needs_flush = true; } static void fdevent_run_setup(fdevent_context_poll* ctx) { { std::lock_guard<std::mutex> lock(ctx->run_queue_mutex_); CHECK(ctx->run_queue_notify_fd_.get() == -1); int s[2]; if (adb_socketpair(s) != 0) { PLOG(FATAL) << "failed to create run queue notify socketpair"; } if (!set_file_block_mode(s[0], false) || !set_file_block_mode(s[1], false)) { PLOG(FATAL) << "failed to make run queue notify socket nonblocking"; } ctx->run_queue_notify_fd_.reset(s[0]); fdevent* fde = ctx->Create(unique_fd(s[1]), fdevent_run_func, &ctx->run_needs_flush_); CHECK(fde != nullptr); ctx->Add(fde, FDE_READ); } fdevent_run_flush(ctx); } void fdevent_context_poll::Run(std::function<void()> fn) { std::lock_guard<std::mutex> lock(run_queue_mutex_); run_queue_.push_back(std::move(fn)); // run_queue_notify_fd could still be -1 if we're called before fdevent has finished setting up. // In that case, rely on the setup code to flush the queue without a notification being needed. if (run_queue_notify_fd_ != -1) { int rc = adb_write(run_queue_notify_fd_.get(), "", 1); // It's possible that we get EAGAIN here, if lots of notifications came in while handling. if (rc == 0) { PLOG(FATAL) << "run queue notify fd was closed?"; } else if (rc == -1 && errno != EAGAIN) { PLOG(FATAL) << "failed to write to run queue notify fd"; } } } static void fdevent_check_spin(fdevent_context_poll* ctx, uint64_t cycle) { // Check to see if we're spinning because we forgot about an fdevent // by keeping track of how long fdevents have been continuously pending. Loading Loading @@ -424,7 +380,6 @@ static void fdevent_check_spin(fdevent_context_poll* ctx, uint64_t cycle) { void fdevent_context_poll::Loop() { this->main_thread_id_ = android::base::GetThreadId(); this->main_thread_valid_ = true; fdevent_run_setup(this); uint64_t cycle = 0; while (true) { Loading @@ -444,17 +399,27 @@ void fdevent_context_poll::Loop() { fdevent_call_fdfunc(fde); } if (run_needs_flush_) { fdevent_run_flush(this); run_needs_flush_ = false; } this->FlushRunQueue(); } } void fdevent_context_poll::TerminateLoop() { terminate_loop_ = true; Interrupt(); } size_t fdevent_context_poll::InstalledCount() { return poll_node_map_.size(); // We always have an installed fde for interrupt. return poll_node_map_.size() - 1; } void fdevent_context_poll::Interrupt() { int rc = adb_write(this->interrupt_fd_, "", 1); // It's possible that we get EAGAIN here, if lots of notifications came in while handling. if (rc == 0) { PLOG(FATAL) << "fdevent interrupt fd was closed?"; } else if (rc == -1 && errno != EAGAIN) { PLOG(FATAL) << "failed to write to fdevent interrupt fd"; } }
adb/fdevent/fdevent_poll.h +9 −8 Original line number Diff line number Diff line Loading @@ -25,6 +25,7 @@ #include <android-base/thread_annotations.h> #include "adb_unique_fd.h" #include "fdevent.h" struct PollNode { Loading @@ -44,7 +45,8 @@ struct PollNode { }; struct fdevent_context_poll : public fdevent_context { virtual ~fdevent_context_poll() = default; fdevent_context_poll(); virtual ~fdevent_context_poll(); virtual fdevent* Create(unique_fd fd, std::variant<fd_func, fd_func2> func, void* arg) final; virtual unique_fd Destroy(fdevent* fde) final; Loading @@ -58,11 +60,13 @@ struct fdevent_context_poll : public fdevent_context { virtual void CheckMainThread() final; virtual void Run(std::function<void()> fn) final; virtual void TerminateLoop() final; virtual size_t InstalledCount() final; protected: virtual void Interrupt() final; public: // All operations to fdevent should happen only in the main thread. // That's why we don't need a lock for fdevent. std::unordered_map<int, PollNode> poll_node_map_; Loading @@ -71,10 +75,7 @@ struct fdevent_context_poll : public fdevent_context { uint64_t main_thread_id_ = 0; uint64_t fdevent_id_ = 0; bool run_needs_flush_ = false; unique_fd run_queue_notify_fd_; std::mutex run_queue_mutex_; std::deque<std::function<void()>> run_queue_ GUARDED_BY(run_queue_mutex_); unique_fd interrupt_fd_; fdevent* interrupt_fde_ = nullptr; std::atomic<bool> terminate_loop_ = false; };
adb/fdevent/fdevent_test.h +2 −2 Original line number Diff line number Diff line Loading @@ -78,8 +78,8 @@ class FdeventTest : public ::testing::Test { } size_t GetAdditionalLocalSocketCount() { // dummy socket installed in PrepareThread() + fdevent_run_on_main_thread socket return 2; // dummy socket installed in PrepareThread() return 1; } void TerminateThread() { Loading