Loading adb/fdevent.cpp +27 −8 Original line number Diff line number Diff line Loading @@ -33,6 +33,8 @@ #include <list> #include <mutex> #include <unordered_map> #include <utility> #include <variant> #include <vector> #include <android-base/chrono_utils.h> Loading Loading @@ -121,13 +123,8 @@ static std::string dump_fde(const fdevent* fde) { state.c_str()); } void fdevent_install(fdevent* fde, int fd, fd_func func, void* arg) { check_main_thread(); CHECK_GE(fd, 0); memset(fde, 0, sizeof(fdevent)); } fdevent* fdevent_create(int fd, fd_func func, void* arg) { template <typename F> static fdevent* fdevent_create_impl(int fd, F func, void* arg) { check_main_thread(); CHECK_GE(fd, 0); Loading @@ -150,6 +147,14 @@ fdevent* fdevent_create(int fd, fd_func func, void* arg) { return fde; } fdevent* fdevent_create(int fd, fd_func func, void* arg) { return fdevent_create_impl(fd, func, arg); } fdevent* fdevent_create(int fd, fd_func2 func, void* arg) { return fdevent_create_impl(fd, func, arg); } unique_fd fdevent_release(fdevent* fde) { check_main_thread(); if (!fde) { Loading Loading @@ -290,13 +295,27 @@ static void fdevent_process() { } } template <class T> struct always_false : std::false_type {}; static void fdevent_call_fdfunc(fdevent* fde) { unsigned events = fde->events; fde->events = 0; CHECK(fde->state & FDE_PENDING); fde->state &= (~FDE_PENDING); D("fdevent_call_fdfunc %s", dump_fde(fde).c_str()); fde->func(fde->fd.get(), events, fde->arg); std::visit( [&](auto&& f) { using F = std::decay_t<decltype(f)>; if constexpr (std::is_same_v<fd_func, F>) { f(fde->fd.get(), events, fde->arg); } else if constexpr (std::is_same_v<fd_func2, F>) { f(fde, events, fde->arg); } else { static_assert(always_false<F>::value, "non-exhaustive visitor"); } }, fde->func); } static void fdevent_run_flush() EXCLUDES(run_queue_mutex) { Loading adb/fdevent.h +8 −9 Original line number Diff line number Diff line Loading @@ -21,6 +21,7 @@ #include <stdint.h> /* for int64_t */ #include <functional> #include <variant> #include "adb_unique_fd.h" Loading @@ -30,6 +31,7 @@ #define FDE_ERROR 0x0004 typedef void (*fd_func)(int fd, unsigned events, void *userdata); typedef void (*fd_func2)(struct fdevent* fde, unsigned events, void* userdata); struct fdevent { uint64_t id; Loading @@ -40,15 +42,14 @@ struct fdevent { uint16_t state = 0; uint16_t events = 0; fd_func func = nullptr; std::variant<fd_func, fd_func2> func; void* arg = nullptr; }; /* Allocate and initialize a new fdevent object * Note: use FD_TIMER as 'fd' to create a fd-less object * (used to implement timers). */ // Allocate and initialize a new fdevent object // TODO: Switch these to unique_fd. fdevent *fdevent_create(int fd, fd_func func, void *arg); fdevent* fdevent_create(int fd, fd_func2 func, void* arg); // Deallocate an fdevent object that was created by fdevent_create. void fdevent_destroy(fdevent *fde); Loading @@ -56,16 +57,14 @@ void fdevent_destroy(fdevent *fde); // fdevent_destroy, except releasing the file descriptor previously owned by the fdevent. unique_fd fdevent_release(fdevent* fde); /* Change which events should cause notifications */ // Change which events should cause notifications void fdevent_set(fdevent *fde, unsigned events); void fdevent_add(fdevent *fde, unsigned events); void fdevent_del(fdevent *fde, unsigned events); void fdevent_set_timeout(fdevent *fde, int64_t timeout_ms); /* loop forever, handling events. */ // Loop forever, handling events. void fdevent_loop(); void check_main_thread(); Loading adb/fdevent_test.cpp +82 −49 Original line number Diff line number Diff line Loading @@ -30,11 +30,17 @@ class FdHandler { public: FdHandler(int read_fd, int write_fd) : read_fd_(read_fd), write_fd_(write_fd) { FdHandler(int read_fd, int write_fd, bool use_new_callback) : read_fd_(read_fd), write_fd_(write_fd) { if (use_new_callback) { read_fde_ = fdevent_create(read_fd_, FdEventNewCallback, this); write_fde_ = fdevent_create(write_fd_, FdEventNewCallback, this); } else { read_fde_ = fdevent_create(read_fd_, FdEventCallback, this); fdevent_add(read_fde_, FDE_READ); write_fde_ = fdevent_create(write_fd_, FdEventCallback, this); } fdevent_add(read_fde_, FDE_READ); } ~FdHandler() { fdevent_destroy(read_fde_); Loading Loading @@ -64,6 +70,29 @@ class FdHandler { } } static void FdEventNewCallback(fdevent* fde, unsigned events, void* userdata) { int fd = fde->fd.get(); FdHandler* handler = reinterpret_cast<FdHandler*>(userdata); ASSERT_EQ(0u, (events & ~(FDE_READ | FDE_WRITE))) << "unexpected events: " << events; if (events & FDE_READ) { ASSERT_EQ(fd, handler->read_fd_); char c; ASSERT_EQ(1, adb_read(fd, &c, 1)); handler->queue_.push(c); fdevent_add(handler->write_fde_, FDE_WRITE); } if (events & FDE_WRITE) { ASSERT_EQ(fd, handler->write_fd_); ASSERT_FALSE(handler->queue_.empty()); char c = handler->queue_.front(); handler->queue_.pop(); ASSERT_EQ(1, adb_write(fd, &c, 1)); if (handler->queue_.empty()) { fdevent_del(handler->write_fde_, FDE_WRITE); } } } private: const int read_fd_; const int write_fd_; Loading @@ -84,6 +113,8 @@ TEST_F(FdeventTest, fdevent_terminate) { } TEST_F(FdeventTest, smoke) { for (bool use_new_callback : {true, false}) { fdevent_reset(); const size_t PIPE_COUNT = 10; const size_t MESSAGE_LOOP_COUNT = 100; const std::string MESSAGE = "fdevent_test"; Loading @@ -101,7 +132,7 @@ TEST_F(FdeventTest, smoke) { PrepareThread(); std::vector<std::unique_ptr<FdHandler>> fd_handlers; fdevent_run_on_main_thread([&thread_arg, &fd_handlers]() { fdevent_run_on_main_thread([&thread_arg, &fd_handlers, use_new_callback]() { std::vector<int> read_fds; std::vector<int> write_fds; Loading @@ -115,7 +146,8 @@ TEST_F(FdeventTest, smoke) { write_fds.push_back(thread_arg.last_write_fd); for (size_t i = 0; i < read_fds.size(); ++i) { fd_handlers.push_back(std::make_unique<FdHandler>(read_fds[i], write_fds[i])); fd_handlers.push_back( std::make_unique<FdHandler>(read_fds[i], write_fds[i], use_new_callback)); } }); WaitForFdeventLoop(); Loading @@ -135,6 +167,7 @@ TEST_F(FdeventTest, smoke) { ASSERT_EQ(0, adb_close(writer)); ASSERT_EQ(0, adb_close(reader)); } } struct InvalidFdArg { fdevent* fde; Loading base/include/android-base/unique_fd.h +5 −1 Original line number Diff line number Diff line Loading @@ -17,10 +17,10 @@ #pragma once #include <dirent.h> #include <errno.h> #include <fcntl.h> #if !defined(_WIN32) #include <dirent.h> #include <sys/socket.h> #endif Loading Loading @@ -114,6 +114,8 @@ class unique_fd_impl final { private: void reset(int new_value, void* previous_tag) { int previous_errno = errno; if (fd_ != -1) { close(fd_, this); } Loading @@ -122,6 +124,8 @@ class unique_fd_impl final { if (new_value != -1) { tag(new_value, previous_tag, this); } errno = previous_errno; } int fd_ = -1; Loading Loading
adb/fdevent.cpp +27 −8 Original line number Diff line number Diff line Loading @@ -33,6 +33,8 @@ #include <list> #include <mutex> #include <unordered_map> #include <utility> #include <variant> #include <vector> #include <android-base/chrono_utils.h> Loading Loading @@ -121,13 +123,8 @@ static std::string dump_fde(const fdevent* fde) { state.c_str()); } void fdevent_install(fdevent* fde, int fd, fd_func func, void* arg) { check_main_thread(); CHECK_GE(fd, 0); memset(fde, 0, sizeof(fdevent)); } fdevent* fdevent_create(int fd, fd_func func, void* arg) { template <typename F> static fdevent* fdevent_create_impl(int fd, F func, void* arg) { check_main_thread(); CHECK_GE(fd, 0); Loading @@ -150,6 +147,14 @@ fdevent* fdevent_create(int fd, fd_func func, void* arg) { return fde; } fdevent* fdevent_create(int fd, fd_func func, void* arg) { return fdevent_create_impl(fd, func, arg); } fdevent* fdevent_create(int fd, fd_func2 func, void* arg) { return fdevent_create_impl(fd, func, arg); } unique_fd fdevent_release(fdevent* fde) { check_main_thread(); if (!fde) { Loading Loading @@ -290,13 +295,27 @@ static void fdevent_process() { } } template <class T> struct always_false : std::false_type {}; static void fdevent_call_fdfunc(fdevent* fde) { unsigned events = fde->events; fde->events = 0; CHECK(fde->state & FDE_PENDING); fde->state &= (~FDE_PENDING); D("fdevent_call_fdfunc %s", dump_fde(fde).c_str()); fde->func(fde->fd.get(), events, fde->arg); std::visit( [&](auto&& f) { using F = std::decay_t<decltype(f)>; if constexpr (std::is_same_v<fd_func, F>) { f(fde->fd.get(), events, fde->arg); } else if constexpr (std::is_same_v<fd_func2, F>) { f(fde, events, fde->arg); } else { static_assert(always_false<F>::value, "non-exhaustive visitor"); } }, fde->func); } static void fdevent_run_flush() EXCLUDES(run_queue_mutex) { Loading
adb/fdevent.h +8 −9 Original line number Diff line number Diff line Loading @@ -21,6 +21,7 @@ #include <stdint.h> /* for int64_t */ #include <functional> #include <variant> #include "adb_unique_fd.h" Loading @@ -30,6 +31,7 @@ #define FDE_ERROR 0x0004 typedef void (*fd_func)(int fd, unsigned events, void *userdata); typedef void (*fd_func2)(struct fdevent* fde, unsigned events, void* userdata); struct fdevent { uint64_t id; Loading @@ -40,15 +42,14 @@ struct fdevent { uint16_t state = 0; uint16_t events = 0; fd_func func = nullptr; std::variant<fd_func, fd_func2> func; void* arg = nullptr; }; /* Allocate and initialize a new fdevent object * Note: use FD_TIMER as 'fd' to create a fd-less object * (used to implement timers). */ // Allocate and initialize a new fdevent object // TODO: Switch these to unique_fd. fdevent *fdevent_create(int fd, fd_func func, void *arg); fdevent* fdevent_create(int fd, fd_func2 func, void* arg); // Deallocate an fdevent object that was created by fdevent_create. void fdevent_destroy(fdevent *fde); Loading @@ -56,16 +57,14 @@ void fdevent_destroy(fdevent *fde); // fdevent_destroy, except releasing the file descriptor previously owned by the fdevent. unique_fd fdevent_release(fdevent* fde); /* Change which events should cause notifications */ // Change which events should cause notifications void fdevent_set(fdevent *fde, unsigned events); void fdevent_add(fdevent *fde, unsigned events); void fdevent_del(fdevent *fde, unsigned events); void fdevent_set_timeout(fdevent *fde, int64_t timeout_ms); /* loop forever, handling events. */ // Loop forever, handling events. void fdevent_loop(); void check_main_thread(); Loading
adb/fdevent_test.cpp +82 −49 Original line number Diff line number Diff line Loading @@ -30,11 +30,17 @@ class FdHandler { public: FdHandler(int read_fd, int write_fd) : read_fd_(read_fd), write_fd_(write_fd) { FdHandler(int read_fd, int write_fd, bool use_new_callback) : read_fd_(read_fd), write_fd_(write_fd) { if (use_new_callback) { read_fde_ = fdevent_create(read_fd_, FdEventNewCallback, this); write_fde_ = fdevent_create(write_fd_, FdEventNewCallback, this); } else { read_fde_ = fdevent_create(read_fd_, FdEventCallback, this); fdevent_add(read_fde_, FDE_READ); write_fde_ = fdevent_create(write_fd_, FdEventCallback, this); } fdevent_add(read_fde_, FDE_READ); } ~FdHandler() { fdevent_destroy(read_fde_); Loading Loading @@ -64,6 +70,29 @@ class FdHandler { } } static void FdEventNewCallback(fdevent* fde, unsigned events, void* userdata) { int fd = fde->fd.get(); FdHandler* handler = reinterpret_cast<FdHandler*>(userdata); ASSERT_EQ(0u, (events & ~(FDE_READ | FDE_WRITE))) << "unexpected events: " << events; if (events & FDE_READ) { ASSERT_EQ(fd, handler->read_fd_); char c; ASSERT_EQ(1, adb_read(fd, &c, 1)); handler->queue_.push(c); fdevent_add(handler->write_fde_, FDE_WRITE); } if (events & FDE_WRITE) { ASSERT_EQ(fd, handler->write_fd_); ASSERT_FALSE(handler->queue_.empty()); char c = handler->queue_.front(); handler->queue_.pop(); ASSERT_EQ(1, adb_write(fd, &c, 1)); if (handler->queue_.empty()) { fdevent_del(handler->write_fde_, FDE_WRITE); } } } private: const int read_fd_; const int write_fd_; Loading @@ -84,6 +113,8 @@ TEST_F(FdeventTest, fdevent_terminate) { } TEST_F(FdeventTest, smoke) { for (bool use_new_callback : {true, false}) { fdevent_reset(); const size_t PIPE_COUNT = 10; const size_t MESSAGE_LOOP_COUNT = 100; const std::string MESSAGE = "fdevent_test"; Loading @@ -101,7 +132,7 @@ TEST_F(FdeventTest, smoke) { PrepareThread(); std::vector<std::unique_ptr<FdHandler>> fd_handlers; fdevent_run_on_main_thread([&thread_arg, &fd_handlers]() { fdevent_run_on_main_thread([&thread_arg, &fd_handlers, use_new_callback]() { std::vector<int> read_fds; std::vector<int> write_fds; Loading @@ -115,7 +146,8 @@ TEST_F(FdeventTest, smoke) { write_fds.push_back(thread_arg.last_write_fd); for (size_t i = 0; i < read_fds.size(); ++i) { fd_handlers.push_back(std::make_unique<FdHandler>(read_fds[i], write_fds[i])); fd_handlers.push_back( std::make_unique<FdHandler>(read_fds[i], write_fds[i], use_new_callback)); } }); WaitForFdeventLoop(); Loading @@ -135,6 +167,7 @@ TEST_F(FdeventTest, smoke) { ASSERT_EQ(0, adb_close(writer)); ASSERT_EQ(0, adb_close(reader)); } } struct InvalidFdArg { fdevent* fde; Loading
base/include/android-base/unique_fd.h +5 −1 Original line number Diff line number Diff line Loading @@ -17,10 +17,10 @@ #pragma once #include <dirent.h> #include <errno.h> #include <fcntl.h> #if !defined(_WIN32) #include <dirent.h> #include <sys/socket.h> #endif Loading Loading @@ -114,6 +114,8 @@ class unique_fd_impl final { private: void reset(int new_value, void* previous_tag) { int previous_errno = errno; if (fd_ != -1) { close(fd_, this); } Loading @@ -122,6 +124,8 @@ class unique_fd_impl final { if (new_value != -1) { tag(new_value, previous_tag, this); } errno = previous_errno; } int fd_ = -1; Loading