Loading adb/fdevent.cpp +62 −4 Original line number Original line Diff line number Diff line Loading @@ -32,6 +32,7 @@ #include <functional> #include <functional> #include <list> #include <list> #include <mutex> #include <mutex> #include <optional> #include <unordered_map> #include <unordered_map> #include <utility> #include <utility> #include <variant> #include <variant> Loading Loading @@ -225,14 +226,22 @@ void fdevent_set(fdevent* fde, unsigned events) { void fdevent_add(fdevent* fde, unsigned events) { void fdevent_add(fdevent* fde, unsigned events) { check_main_thread(); check_main_thread(); CHECK(!(events & FDE_TIMEOUT)); fdevent_set(fde, (fde->state & FDE_EVENTMASK) | events); fdevent_set(fde, (fde->state & FDE_EVENTMASK) | events); } } void fdevent_del(fdevent* fde, unsigned events) { void fdevent_del(fdevent* fde, unsigned events) { check_main_thread(); check_main_thread(); CHECK(!(events & FDE_TIMEOUT)); fdevent_set(fde, (fde->state & FDE_EVENTMASK) & ~events); fdevent_set(fde, (fde->state & FDE_EVENTMASK) & ~events); } } void fdevent_set_timeout(fdevent* fde, std::optional<std::chrono::milliseconds> timeout) { check_main_thread(); fde->timeout = timeout; fde->last_active = std::chrono::steady_clock::now(); } static std::string dump_pollfds(const std::vector<adb_pollfd>& pollfds) { static std::string dump_pollfds(const std::vector<adb_pollfd>& pollfds) { std::string result; std::string result; for (const auto& pollfd : pollfds) { for (const auto& pollfd : pollfds) { Loading @@ -248,6 +257,32 @@ static std::string dump_pollfds(const std::vector<adb_pollfd>& pollfds) { return result; return result; } } static std::optional<std::chrono::milliseconds> calculate_timeout() { std::optional<std::chrono::milliseconds> result = std::nullopt; auto now = std::chrono::steady_clock::now(); check_main_thread(); for (const auto& [fd, pollnode] : g_poll_node_map) { UNUSED(fd); auto timeout_opt = pollnode.fde->timeout; if (timeout_opt) { auto deadline = pollnode.fde->last_active + *timeout_opt; auto time_left = std::chrono::duration_cast<std::chrono::milliseconds>(deadline - now); if (time_left < std::chrono::milliseconds::zero()) { time_left = std::chrono::milliseconds::zero(); } if (!result) { result = time_left; } else { result = std::min(*result, time_left); } } } return result; } static void fdevent_process() { static void fdevent_process() { std::vector<adb_pollfd> pollfds; std::vector<adb_pollfd> pollfds; for (const auto& pair : g_poll_node_map) { for (const auto& pair : g_poll_node_map) { Loading @@ -256,11 +291,22 @@ static void fdevent_process() { CHECK_GT(pollfds.size(), 0u); CHECK_GT(pollfds.size(), 0u); D("poll(), pollfds = %s", dump_pollfds(pollfds).c_str()); D("poll(), pollfds = %s", dump_pollfds(pollfds).c_str()); int ret = adb_poll(&pollfds[0], pollfds.size(), -1); auto timeout = calculate_timeout(); int timeout_ms; if (!timeout) { timeout_ms = -1; } else { timeout_ms = timeout->count(); } int ret = adb_poll(&pollfds[0], pollfds.size(), timeout_ms); if (ret == -1) { if (ret == -1) { PLOG(ERROR) << "poll(), ret = " << ret; PLOG(ERROR) << "poll(), ret = " << ret; return; return; } } auto post_poll = std::chrono::steady_clock::now(); for (const auto& pollfd : pollfds) { for (const auto& pollfd : pollfds) { if (pollfd.revents != 0) { if (pollfd.revents != 0) { D("for fd %d, revents = %x", pollfd.fd, pollfd.revents); D("for fd %d, revents = %x", pollfd.fd, pollfd.revents); Loading @@ -282,12 +328,24 @@ static void fdevent_process() { events |= FDE_READ | FDE_ERROR; events |= FDE_READ | FDE_ERROR; } } #endif #endif if (events != 0) { auto it = g_poll_node_map.find(pollfd.fd); auto it = g_poll_node_map.find(pollfd.fd); CHECK(it != g_poll_node_map.end()); CHECK(it != g_poll_node_map.end()); fdevent* fde = it->second.fde; fdevent* fde = it->second.fde; if (events == 0) { // Check for timeout. if (fde->timeout) { auto deadline = fde->last_active + *fde->timeout; if (deadline < post_poll) { events |= FDE_TIMEOUT; } } } if (events != 0) { CHECK_EQ(fde->fd.get(), pollfd.fd); CHECK_EQ(fde->fd.get(), pollfd.fd); fde->events |= events; fde->events |= events; fde->last_active = post_poll; D("%s got events %x", dump_fde(fde).c_str(), events); D("%s got events %x", dump_fde(fde).c_str(), events); fde->state |= FDE_PENDING; fde->state |= FDE_PENDING; g_pending_list.push_back(fde); g_pending_list.push_back(fde); Loading adb/fdevent.h +15 −6 Original line number Original line Diff line number Diff line Loading @@ -18,17 +18,20 @@ #define __FDEVENT_H #define __FDEVENT_H #include <stddef.h> #include <stddef.h> #include <stdint.h> /* for int64_t */ #include <stdint.h> #include <chrono> #include <functional> #include <functional> #include <optional> #include <variant> #include <variant> #include "adb_unique_fd.h" #include "adb_unique_fd.h" /* events that may be observed */ // Events that may be observed #define FDE_READ 0x0001 #define FDE_READ 0x0001 #define FDE_WRITE 0x0002 #define FDE_WRITE 0x0002 #define FDE_ERROR 0x0004 #define FDE_ERROR 0x0004 #define FDE_TIMEOUT 0x0008 typedef void (*fd_func)(int fd, unsigned events, void *userdata); typedef void (*fd_func)(int fd, unsigned events, void *userdata); typedef void (*fd_func2)(struct fdevent* fde, unsigned events, void* userdata); typedef void (*fd_func2)(struct fdevent* fde, unsigned events, void* userdata); Loading @@ -41,6 +44,8 @@ struct fdevent { uint16_t state = 0; uint16_t state = 0; uint16_t events = 0; uint16_t events = 0; std::optional<std::chrono::milliseconds> timeout; std::chrono::steady_clock::time_point last_active; std::variant<fd_func, fd_func2> func; std::variant<fd_func, fd_func2> func; void* arg = nullptr; void* arg = nullptr; Loading @@ -62,7 +67,11 @@ void fdevent_set(fdevent *fde, unsigned events); void fdevent_add(fdevent *fde, unsigned events); void fdevent_add(fdevent *fde, unsigned events); void fdevent_del(fdevent *fde, unsigned events); void fdevent_del(fdevent *fde, unsigned events); void fdevent_set_timeout(fdevent *fde, int64_t timeout_ms); // Set a timeout on an fdevent. // If no events are triggered by the timeout, an FDE_TIMEOUT will be generated. // Note timeouts are not defused automatically; if a timeout is set on an fdevent, it will // trigger repeatedly every |timeout| ms. void fdevent_set_timeout(fdevent* fde, std::optional<std::chrono::milliseconds> timeout); // Loop forever, handling events. // Loop forever, handling events. void fdevent_loop(); void fdevent_loop(); Loading adb/fdevent_test.cpp +100 −0 Original line number Original line Diff line number Diff line Loading @@ -18,6 +18,7 @@ #include <gtest/gtest.h> #include <gtest/gtest.h> #include <chrono> #include <limits> #include <limits> #include <memory> #include <memory> #include <queue> #include <queue> Loading @@ -28,6 +29,8 @@ #include "adb_io.h" #include "adb_io.h" #include "fdevent_test.h" #include "fdevent_test.h" using namespace std::chrono_literals; class FdHandler { class FdHandler { public: public: FdHandler(int read_fd, int write_fd, bool use_new_callback) FdHandler(int read_fd, int write_fd, bool use_new_callback) Loading Loading @@ -257,3 +260,100 @@ TEST_F(FdeventTest, run_on_main_thread_reentrant) { ASSERT_EQ(i, vec[i]); ASSERT_EQ(i, vec[i]); } } } } TEST_F(FdeventTest, timeout) { fdevent_reset(); PrepareThread(); enum class TimeoutEvent { read, timeout, done, }; struct TimeoutTest { std::vector<std::pair<TimeoutEvent, std::chrono::steady_clock::time_point>> events; fdevent* fde; }; TimeoutTest test; int fds[2]; ASSERT_EQ(0, adb_socketpair(fds)); static constexpr auto delta = 100ms; fdevent_run_on_main_thread([&]() { test.fde = fdevent_create(fds[0], [](fdevent* fde, unsigned events, void* arg) { auto test = static_cast<TimeoutTest*>(arg); auto now = std::chrono::steady_clock::now(); CHECK((events & FDE_READ) ^ (events & FDE_TIMEOUT)); TimeoutEvent event; if ((events & FDE_READ)) { char buf[2]; ssize_t rc = adb_read(fde->fd.get(), buf, sizeof(buf)); if (rc == 0) { event = TimeoutEvent::done; } else if (rc == 1) { event = TimeoutEvent::read; } else { abort(); } } else if ((events & FDE_TIMEOUT)) { event = TimeoutEvent::timeout; } else { abort(); } CHECK_EQ(fde, test->fde); test->events.emplace_back(event, now); if (event == TimeoutEvent::done) { fdevent_destroy(fde); } }, &test); fdevent_add(test.fde, FDE_READ); fdevent_set_timeout(test.fde, delta); }); ASSERT_EQ(1, adb_write(fds[1], "", 1)); // Timeout should happen here std::this_thread::sleep_for(delta); // and another. std::this_thread::sleep_for(delta); // No timeout should happen here. std::this_thread::sleep_for(delta / 2); adb_close(fds[1]); TerminateThread(); ASSERT_EQ(4ULL, test.events.size()); ASSERT_EQ(TimeoutEvent::read, test.events[0].first); ASSERT_EQ(TimeoutEvent::timeout, test.events[1].first); ASSERT_EQ(TimeoutEvent::timeout, test.events[2].first); ASSERT_EQ(TimeoutEvent::done, test.events[3].first); std::vector<int> time_deltas; for (size_t i = 0; i < test.events.size() - 1; ++i) { auto before = test.events[i].second; auto after = test.events[i + 1].second; auto diff = std::chrono::duration_cast<std::chrono::milliseconds>(after - before); time_deltas.push_back(diff.count()); } std::vector<int> expected = { delta.count(), delta.count(), delta.count() / 2, }; std::vector<int> diff; ASSERT_EQ(time_deltas.size(), expected.size()); for (size_t i = 0; i < time_deltas.size(); ++i) { diff.push_back(std::abs(time_deltas[i] - expected[i])); } ASSERT_LT(diff[0], delta.count() * 0.5); ASSERT_LT(diff[1], delta.count() * 0.5); ASSERT_LT(diff[2], delta.count() * 0.5); } adb/socket_test.cpp +2 −0 Original line number Original line Diff line number Diff line Loading @@ -221,6 +221,8 @@ TEST_F(LocalSocketTest, write_error_when_having_packets) { EXPECT_EQ(2u + GetAdditionalLocalSocketCount(), fdevent_installed_count()); EXPECT_EQ(2u + GetAdditionalLocalSocketCount(), fdevent_installed_count()); ASSERT_EQ(0, adb_close(socket_fd[0])); ASSERT_EQ(0, adb_close(socket_fd[0])); std::this_thread::sleep_for(2s); WaitForFdeventLoop(); WaitForFdeventLoop(); ASSERT_EQ(GetAdditionalLocalSocketCount(), fdevent_installed_count()); ASSERT_EQ(GetAdditionalLocalSocketCount(), fdevent_installed_count()); TerminateThread(); TerminateThread(); Loading adb/sockets.cpp +55 −4 Original line number Original line Diff line number Diff line Loading @@ -26,6 +26,7 @@ #include <unistd.h> #include <unistd.h> #include <algorithm> #include <algorithm> #include <chrono> #include <mutex> #include <mutex> #include <string> #include <string> #include <vector> #include <vector> Loading @@ -41,6 +42,8 @@ #include "transport.h" #include "transport.h" #include "types.h" #include "types.h" using namespace std::chrono_literals; static std::recursive_mutex& local_socket_list_lock = *new std::recursive_mutex(); static std::recursive_mutex& local_socket_list_lock = *new std::recursive_mutex(); static unsigned local_socket_next_id = 1; static unsigned local_socket_next_id = 1; Loading Loading @@ -238,16 +241,64 @@ static void local_socket_ready(asocket* s) { fdevent_add(s->fde, FDE_READ); fdevent_add(s->fde, FDE_READ); } } struct ClosingSocket { std::chrono::steady_clock::time_point begin; }; // The standard (RFC 1122 - 4.2.2.13) says that if we call close on a // socket while we have pending data, a TCP RST should be sent to the // other end to notify it that we didn't read all of its data. However, // this can result in data that we've successfully written out to be dropped // on the other end. To avoid this, instead of immediately closing a // socket, call shutdown on it instead, and then read from the file // descriptor until we hit EOF or an error before closing. static void deferred_close(unique_fd fd) { // Shutdown the socket in the outgoing direction only, so that // we don't have the same problem on the opposite end. adb_shutdown(fd.get(), SHUT_WR); auto callback = [](fdevent* fde, unsigned event, void* arg) { auto socket_info = static_cast<ClosingSocket*>(arg); if (event & FDE_READ) { ssize_t rc; char buf[BUFSIZ]; while ((rc = adb_read(fde->fd.get(), buf, sizeof(buf))) > 0) { continue; } if (rc == -1 && errno == EAGAIN) { // There's potentially more data to read. auto duration = std::chrono::steady_clock::now() - socket_info->begin; if (duration > 1s) { LOG(WARNING) << "timeout expired while flushing socket, closing"; } else { return; } } } else if (event & FDE_TIMEOUT) { LOG(WARNING) << "timeout expired while flushing socket, closing"; } // Either there was an error, we hit the end of the socket, or our timeout expired. fdevent_destroy(fde); delete socket_info; }; ClosingSocket* socket_info = new ClosingSocket{ .begin = std::chrono::steady_clock::now(), }; fdevent* fde = fdevent_create(fd.release(), callback, socket_info); fdevent_add(fde, FDE_READ); fdevent_set_timeout(fde, 1s); } // be sure to hold the socket list lock when calling this // be sure to hold the socket list lock when calling this static void local_socket_destroy(asocket* s) { static void local_socket_destroy(asocket* s) { int exit_on_close = s->exit_on_close; int exit_on_close = s->exit_on_close; D("LS(%d): destroying fde.fd=%d", s->id, s->fd); D("LS(%d): destroying fde.fd=%d", s->id, s->fd); /* IMPORTANT: the remove closes the fd deferred_close(fdevent_release(s->fde)); ** that belongs to this socket */ fdevent_destroy(s->fde); remove_socket(s); remove_socket(s); delete s; delete s; Loading Loading
adb/fdevent.cpp +62 −4 Original line number Original line Diff line number Diff line Loading @@ -32,6 +32,7 @@ #include <functional> #include <functional> #include <list> #include <list> #include <mutex> #include <mutex> #include <optional> #include <unordered_map> #include <unordered_map> #include <utility> #include <utility> #include <variant> #include <variant> Loading Loading @@ -225,14 +226,22 @@ void fdevent_set(fdevent* fde, unsigned events) { void fdevent_add(fdevent* fde, unsigned events) { void fdevent_add(fdevent* fde, unsigned events) { check_main_thread(); check_main_thread(); CHECK(!(events & FDE_TIMEOUT)); fdevent_set(fde, (fde->state & FDE_EVENTMASK) | events); fdevent_set(fde, (fde->state & FDE_EVENTMASK) | events); } } void fdevent_del(fdevent* fde, unsigned events) { void fdevent_del(fdevent* fde, unsigned events) { check_main_thread(); check_main_thread(); CHECK(!(events & FDE_TIMEOUT)); fdevent_set(fde, (fde->state & FDE_EVENTMASK) & ~events); fdevent_set(fde, (fde->state & FDE_EVENTMASK) & ~events); } } void fdevent_set_timeout(fdevent* fde, std::optional<std::chrono::milliseconds> timeout) { check_main_thread(); fde->timeout = timeout; fde->last_active = std::chrono::steady_clock::now(); } static std::string dump_pollfds(const std::vector<adb_pollfd>& pollfds) { static std::string dump_pollfds(const std::vector<adb_pollfd>& pollfds) { std::string result; std::string result; for (const auto& pollfd : pollfds) { for (const auto& pollfd : pollfds) { Loading @@ -248,6 +257,32 @@ static std::string dump_pollfds(const std::vector<adb_pollfd>& pollfds) { return result; return result; } } static std::optional<std::chrono::milliseconds> calculate_timeout() { std::optional<std::chrono::milliseconds> result = std::nullopt; auto now = std::chrono::steady_clock::now(); check_main_thread(); for (const auto& [fd, pollnode] : g_poll_node_map) { UNUSED(fd); auto timeout_opt = pollnode.fde->timeout; if (timeout_opt) { auto deadline = pollnode.fde->last_active + *timeout_opt; auto time_left = std::chrono::duration_cast<std::chrono::milliseconds>(deadline - now); if (time_left < std::chrono::milliseconds::zero()) { time_left = std::chrono::milliseconds::zero(); } if (!result) { result = time_left; } else { result = std::min(*result, time_left); } } } return result; } static void fdevent_process() { static void fdevent_process() { std::vector<adb_pollfd> pollfds; std::vector<adb_pollfd> pollfds; for (const auto& pair : g_poll_node_map) { for (const auto& pair : g_poll_node_map) { Loading @@ -256,11 +291,22 @@ static void fdevent_process() { CHECK_GT(pollfds.size(), 0u); CHECK_GT(pollfds.size(), 0u); D("poll(), pollfds = %s", dump_pollfds(pollfds).c_str()); D("poll(), pollfds = %s", dump_pollfds(pollfds).c_str()); int ret = adb_poll(&pollfds[0], pollfds.size(), -1); auto timeout = calculate_timeout(); int timeout_ms; if (!timeout) { timeout_ms = -1; } else { timeout_ms = timeout->count(); } int ret = adb_poll(&pollfds[0], pollfds.size(), timeout_ms); if (ret == -1) { if (ret == -1) { PLOG(ERROR) << "poll(), ret = " << ret; PLOG(ERROR) << "poll(), ret = " << ret; return; return; } } auto post_poll = std::chrono::steady_clock::now(); for (const auto& pollfd : pollfds) { for (const auto& pollfd : pollfds) { if (pollfd.revents != 0) { if (pollfd.revents != 0) { D("for fd %d, revents = %x", pollfd.fd, pollfd.revents); D("for fd %d, revents = %x", pollfd.fd, pollfd.revents); Loading @@ -282,12 +328,24 @@ static void fdevent_process() { events |= FDE_READ | FDE_ERROR; events |= FDE_READ | FDE_ERROR; } } #endif #endif if (events != 0) { auto it = g_poll_node_map.find(pollfd.fd); auto it = g_poll_node_map.find(pollfd.fd); CHECK(it != g_poll_node_map.end()); CHECK(it != g_poll_node_map.end()); fdevent* fde = it->second.fde; fdevent* fde = it->second.fde; if (events == 0) { // Check for timeout. if (fde->timeout) { auto deadline = fde->last_active + *fde->timeout; if (deadline < post_poll) { events |= FDE_TIMEOUT; } } } if (events != 0) { CHECK_EQ(fde->fd.get(), pollfd.fd); CHECK_EQ(fde->fd.get(), pollfd.fd); fde->events |= events; fde->events |= events; fde->last_active = post_poll; D("%s got events %x", dump_fde(fde).c_str(), events); D("%s got events %x", dump_fde(fde).c_str(), events); fde->state |= FDE_PENDING; fde->state |= FDE_PENDING; g_pending_list.push_back(fde); g_pending_list.push_back(fde); Loading
adb/fdevent.h +15 −6 Original line number Original line Diff line number Diff line Loading @@ -18,17 +18,20 @@ #define __FDEVENT_H #define __FDEVENT_H #include <stddef.h> #include <stddef.h> #include <stdint.h> /* for int64_t */ #include <stdint.h> #include <chrono> #include <functional> #include <functional> #include <optional> #include <variant> #include <variant> #include "adb_unique_fd.h" #include "adb_unique_fd.h" /* events that may be observed */ // Events that may be observed #define FDE_READ 0x0001 #define FDE_READ 0x0001 #define FDE_WRITE 0x0002 #define FDE_WRITE 0x0002 #define FDE_ERROR 0x0004 #define FDE_ERROR 0x0004 #define FDE_TIMEOUT 0x0008 typedef void (*fd_func)(int fd, unsigned events, void *userdata); typedef void (*fd_func)(int fd, unsigned events, void *userdata); typedef void (*fd_func2)(struct fdevent* fde, unsigned events, void* userdata); typedef void (*fd_func2)(struct fdevent* fde, unsigned events, void* userdata); Loading @@ -41,6 +44,8 @@ struct fdevent { uint16_t state = 0; uint16_t state = 0; uint16_t events = 0; uint16_t events = 0; std::optional<std::chrono::milliseconds> timeout; std::chrono::steady_clock::time_point last_active; std::variant<fd_func, fd_func2> func; std::variant<fd_func, fd_func2> func; void* arg = nullptr; void* arg = nullptr; Loading @@ -62,7 +67,11 @@ void fdevent_set(fdevent *fde, unsigned events); void fdevent_add(fdevent *fde, unsigned events); void fdevent_add(fdevent *fde, unsigned events); void fdevent_del(fdevent *fde, unsigned events); void fdevent_del(fdevent *fde, unsigned events); void fdevent_set_timeout(fdevent *fde, int64_t timeout_ms); // Set a timeout on an fdevent. // If no events are triggered by the timeout, an FDE_TIMEOUT will be generated. // Note timeouts are not defused automatically; if a timeout is set on an fdevent, it will // trigger repeatedly every |timeout| ms. void fdevent_set_timeout(fdevent* fde, std::optional<std::chrono::milliseconds> timeout); // Loop forever, handling events. // Loop forever, handling events. void fdevent_loop(); void fdevent_loop(); Loading
adb/fdevent_test.cpp +100 −0 Original line number Original line Diff line number Diff line Loading @@ -18,6 +18,7 @@ #include <gtest/gtest.h> #include <gtest/gtest.h> #include <chrono> #include <limits> #include <limits> #include <memory> #include <memory> #include <queue> #include <queue> Loading @@ -28,6 +29,8 @@ #include "adb_io.h" #include "adb_io.h" #include "fdevent_test.h" #include "fdevent_test.h" using namespace std::chrono_literals; class FdHandler { class FdHandler { public: public: FdHandler(int read_fd, int write_fd, bool use_new_callback) FdHandler(int read_fd, int write_fd, bool use_new_callback) Loading Loading @@ -257,3 +260,100 @@ TEST_F(FdeventTest, run_on_main_thread_reentrant) { ASSERT_EQ(i, vec[i]); ASSERT_EQ(i, vec[i]); } } } } TEST_F(FdeventTest, timeout) { fdevent_reset(); PrepareThread(); enum class TimeoutEvent { read, timeout, done, }; struct TimeoutTest { std::vector<std::pair<TimeoutEvent, std::chrono::steady_clock::time_point>> events; fdevent* fde; }; TimeoutTest test; int fds[2]; ASSERT_EQ(0, adb_socketpair(fds)); static constexpr auto delta = 100ms; fdevent_run_on_main_thread([&]() { test.fde = fdevent_create(fds[0], [](fdevent* fde, unsigned events, void* arg) { auto test = static_cast<TimeoutTest*>(arg); auto now = std::chrono::steady_clock::now(); CHECK((events & FDE_READ) ^ (events & FDE_TIMEOUT)); TimeoutEvent event; if ((events & FDE_READ)) { char buf[2]; ssize_t rc = adb_read(fde->fd.get(), buf, sizeof(buf)); if (rc == 0) { event = TimeoutEvent::done; } else if (rc == 1) { event = TimeoutEvent::read; } else { abort(); } } else if ((events & FDE_TIMEOUT)) { event = TimeoutEvent::timeout; } else { abort(); } CHECK_EQ(fde, test->fde); test->events.emplace_back(event, now); if (event == TimeoutEvent::done) { fdevent_destroy(fde); } }, &test); fdevent_add(test.fde, FDE_READ); fdevent_set_timeout(test.fde, delta); }); ASSERT_EQ(1, adb_write(fds[1], "", 1)); // Timeout should happen here std::this_thread::sleep_for(delta); // and another. std::this_thread::sleep_for(delta); // No timeout should happen here. std::this_thread::sleep_for(delta / 2); adb_close(fds[1]); TerminateThread(); ASSERT_EQ(4ULL, test.events.size()); ASSERT_EQ(TimeoutEvent::read, test.events[0].first); ASSERT_EQ(TimeoutEvent::timeout, test.events[1].first); ASSERT_EQ(TimeoutEvent::timeout, test.events[2].first); ASSERT_EQ(TimeoutEvent::done, test.events[3].first); std::vector<int> time_deltas; for (size_t i = 0; i < test.events.size() - 1; ++i) { auto before = test.events[i].second; auto after = test.events[i + 1].second; auto diff = std::chrono::duration_cast<std::chrono::milliseconds>(after - before); time_deltas.push_back(diff.count()); } std::vector<int> expected = { delta.count(), delta.count(), delta.count() / 2, }; std::vector<int> diff; ASSERT_EQ(time_deltas.size(), expected.size()); for (size_t i = 0; i < time_deltas.size(); ++i) { diff.push_back(std::abs(time_deltas[i] - expected[i])); } ASSERT_LT(diff[0], delta.count() * 0.5); ASSERT_LT(diff[1], delta.count() * 0.5); ASSERT_LT(diff[2], delta.count() * 0.5); }
adb/socket_test.cpp +2 −0 Original line number Original line Diff line number Diff line Loading @@ -221,6 +221,8 @@ TEST_F(LocalSocketTest, write_error_when_having_packets) { EXPECT_EQ(2u + GetAdditionalLocalSocketCount(), fdevent_installed_count()); EXPECT_EQ(2u + GetAdditionalLocalSocketCount(), fdevent_installed_count()); ASSERT_EQ(0, adb_close(socket_fd[0])); ASSERT_EQ(0, adb_close(socket_fd[0])); std::this_thread::sleep_for(2s); WaitForFdeventLoop(); WaitForFdeventLoop(); ASSERT_EQ(GetAdditionalLocalSocketCount(), fdevent_installed_count()); ASSERT_EQ(GetAdditionalLocalSocketCount(), fdevent_installed_count()); TerminateThread(); TerminateThread(); Loading
adb/sockets.cpp +55 −4 Original line number Original line Diff line number Diff line Loading @@ -26,6 +26,7 @@ #include <unistd.h> #include <unistd.h> #include <algorithm> #include <algorithm> #include <chrono> #include <mutex> #include <mutex> #include <string> #include <string> #include <vector> #include <vector> Loading @@ -41,6 +42,8 @@ #include "transport.h" #include "transport.h" #include "types.h" #include "types.h" using namespace std::chrono_literals; static std::recursive_mutex& local_socket_list_lock = *new std::recursive_mutex(); static std::recursive_mutex& local_socket_list_lock = *new std::recursive_mutex(); static unsigned local_socket_next_id = 1; static unsigned local_socket_next_id = 1; Loading Loading @@ -238,16 +241,64 @@ static void local_socket_ready(asocket* s) { fdevent_add(s->fde, FDE_READ); fdevent_add(s->fde, FDE_READ); } } struct ClosingSocket { std::chrono::steady_clock::time_point begin; }; // The standard (RFC 1122 - 4.2.2.13) says that if we call close on a // socket while we have pending data, a TCP RST should be sent to the // other end to notify it that we didn't read all of its data. However, // this can result in data that we've successfully written out to be dropped // on the other end. To avoid this, instead of immediately closing a // socket, call shutdown on it instead, and then read from the file // descriptor until we hit EOF or an error before closing. static void deferred_close(unique_fd fd) { // Shutdown the socket in the outgoing direction only, so that // we don't have the same problem on the opposite end. adb_shutdown(fd.get(), SHUT_WR); auto callback = [](fdevent* fde, unsigned event, void* arg) { auto socket_info = static_cast<ClosingSocket*>(arg); if (event & FDE_READ) { ssize_t rc; char buf[BUFSIZ]; while ((rc = adb_read(fde->fd.get(), buf, sizeof(buf))) > 0) { continue; } if (rc == -1 && errno == EAGAIN) { // There's potentially more data to read. auto duration = std::chrono::steady_clock::now() - socket_info->begin; if (duration > 1s) { LOG(WARNING) << "timeout expired while flushing socket, closing"; } else { return; } } } else if (event & FDE_TIMEOUT) { LOG(WARNING) << "timeout expired while flushing socket, closing"; } // Either there was an error, we hit the end of the socket, or our timeout expired. fdevent_destroy(fde); delete socket_info; }; ClosingSocket* socket_info = new ClosingSocket{ .begin = std::chrono::steady_clock::now(), }; fdevent* fde = fdevent_create(fd.release(), callback, socket_info); fdevent_add(fde, FDE_READ); fdevent_set_timeout(fde, 1s); } // be sure to hold the socket list lock when calling this // be sure to hold the socket list lock when calling this static void local_socket_destroy(asocket* s) { static void local_socket_destroy(asocket* s) { int exit_on_close = s->exit_on_close; int exit_on_close = s->exit_on_close; D("LS(%d): destroying fde.fd=%d", s->id, s->fd); D("LS(%d): destroying fde.fd=%d", s->id, s->fd); /* IMPORTANT: the remove closes the fd deferred_close(fdevent_release(s->fde)); ** that belongs to this socket */ fdevent_destroy(s->fde); remove_socket(s); remove_socket(s); delete s; delete s; Loading