Donate to e Foundation | Murena handsets with /e/OS | Own a part of Murena! Learn more

Commit 0220ca7d authored by Josh Gao's avatar Josh Gao Committed by Gerrit Code Review
Browse files

Merge changes I1abd671f,I9ae61465

* changes:
  adb: don't close sockets before hitting EOF.
  adb: implement fdevent_set_timeout.
parents bec58544 74b7ec72
Loading
Loading
Loading
Loading
+62 −4
Original line number Diff line number Diff line
@@ -32,6 +32,7 @@
#include <functional>
#include <list>
#include <mutex>
#include <optional>
#include <unordered_map>
#include <utility>
#include <variant>
@@ -225,14 +226,22 @@ void fdevent_set(fdevent* fde, unsigned events) {

void fdevent_add(fdevent* fde, unsigned events) {
    check_main_thread();
    CHECK(!(events & FDE_TIMEOUT));
    fdevent_set(fde, (fde->state & FDE_EVENTMASK) | events);
}

void fdevent_del(fdevent* fde, unsigned events) {
    check_main_thread();
    CHECK(!(events & FDE_TIMEOUT));
    fdevent_set(fde, (fde->state & FDE_EVENTMASK) & ~events);
}

void fdevent_set_timeout(fdevent* fde, std::optional<std::chrono::milliseconds> timeout) {
    check_main_thread();
    fde->timeout = timeout;
    fde->last_active = std::chrono::steady_clock::now();
}

static std::string dump_pollfds(const std::vector<adb_pollfd>& pollfds) {
    std::string result;
    for (const auto& pollfd : pollfds) {
@@ -248,6 +257,32 @@ static std::string dump_pollfds(const std::vector<adb_pollfd>& pollfds) {
    return result;
}

static std::optional<std::chrono::milliseconds> calculate_timeout() {
    std::optional<std::chrono::milliseconds> result = std::nullopt;
    auto now = std::chrono::steady_clock::now();
    check_main_thread();

    for (const auto& [fd, pollnode] : g_poll_node_map) {
        UNUSED(fd);
        auto timeout_opt = pollnode.fde->timeout;
        if (timeout_opt) {
            auto deadline = pollnode.fde->last_active + *timeout_opt;
            auto time_left = std::chrono::duration_cast<std::chrono::milliseconds>(deadline - now);
            if (time_left < std::chrono::milliseconds::zero()) {
                time_left = std::chrono::milliseconds::zero();
            }

            if (!result) {
                result = time_left;
            } else {
                result = std::min(*result, time_left);
            }
        }
    }

    return result;
}

static void fdevent_process() {
    std::vector<adb_pollfd> pollfds;
    for (const auto& pair : g_poll_node_map) {
@@ -256,11 +291,22 @@ static void fdevent_process() {
    CHECK_GT(pollfds.size(), 0u);
    D("poll(), pollfds = %s", dump_pollfds(pollfds).c_str());

    int ret = adb_poll(&pollfds[0], pollfds.size(), -1);
    auto timeout = calculate_timeout();
    int timeout_ms;
    if (!timeout) {
        timeout_ms = -1;
    } else {
        timeout_ms = timeout->count();
    }

    int ret = adb_poll(&pollfds[0], pollfds.size(), timeout_ms);
    if (ret == -1) {
        PLOG(ERROR) << "poll(), ret = " << ret;
        return;
    }

    auto post_poll = std::chrono::steady_clock::now();

    for (const auto& pollfd : pollfds) {
        if (pollfd.revents != 0) {
            D("for fd %d, revents = %x", pollfd.fd, pollfd.revents);
@@ -282,12 +328,24 @@ static void fdevent_process() {
            events |= FDE_READ | FDE_ERROR;
        }
#endif
        if (events != 0) {
        auto it = g_poll_node_map.find(pollfd.fd);
        CHECK(it != g_poll_node_map.end());
        fdevent* fde = it->second.fde;

        if (events == 0) {
            // Check for timeout.
            if (fde->timeout) {
                auto deadline = fde->last_active + *fde->timeout;
                if (deadline < post_poll) {
                    events |= FDE_TIMEOUT;
                }
            }
        }

        if (events != 0) {
            CHECK_EQ(fde->fd.get(), pollfd.fd);
            fde->events |= events;
            fde->last_active = post_poll;
            D("%s got events %x", dump_fde(fde).c_str(), events);
            fde->state |= FDE_PENDING;
            g_pending_list.push_back(fde);
+15 −6
Original line number Diff line number Diff line
@@ -18,17 +18,20 @@
#define __FDEVENT_H

#include <stddef.h>
#include <stdint.h>  /* for int64_t */
#include <stdint.h>

#include <chrono>
#include <functional>
#include <optional>
#include <variant>

#include "adb_unique_fd.h"

/* events that may be observed */
// Events that may be observed
#define FDE_READ 0x0001
#define FDE_WRITE 0x0002
#define FDE_ERROR 0x0004
#define FDE_TIMEOUT 0x0008

typedef void (*fd_func)(int fd, unsigned events, void *userdata);
typedef void (*fd_func2)(struct fdevent* fde, unsigned events, void* userdata);
@@ -41,6 +44,8 @@ struct fdevent {

    uint16_t state = 0;
    uint16_t events = 0;
    std::optional<std::chrono::milliseconds> timeout;
    std::chrono::steady_clock::time_point last_active;

    std::variant<fd_func, fd_func2> func;
    void* arg = nullptr;
@@ -62,7 +67,11 @@ void fdevent_set(fdevent *fde, unsigned events);
void fdevent_add(fdevent *fde, unsigned events);
void fdevent_del(fdevent *fde, unsigned events);

void fdevent_set_timeout(fdevent *fde, int64_t  timeout_ms);
// Set a timeout on an fdevent.
// If no events are triggered by the timeout, an FDE_TIMEOUT will be generated.
// Note timeouts are not defused automatically; if a timeout is set on an fdevent, it will
// trigger repeatedly every |timeout| ms.
void fdevent_set_timeout(fdevent* fde, std::optional<std::chrono::milliseconds> timeout);

// Loop forever, handling events.
void fdevent_loop();
+100 −0
Original line number Diff line number Diff line
@@ -18,6 +18,7 @@

#include <gtest/gtest.h>

#include <chrono>
#include <limits>
#include <memory>
#include <queue>
@@ -28,6 +29,8 @@
#include "adb_io.h"
#include "fdevent_test.h"

using namespace std::chrono_literals;

class FdHandler {
  public:
    FdHandler(int read_fd, int write_fd, bool use_new_callback)
@@ -257,3 +260,100 @@ TEST_F(FdeventTest, run_on_main_thread_reentrant) {
        ASSERT_EQ(i, vec[i]);
    }
}

TEST_F(FdeventTest, timeout) {
    fdevent_reset();
    PrepareThread();

    enum class TimeoutEvent {
        read,
        timeout,
        done,
    };

    struct TimeoutTest {
        std::vector<std::pair<TimeoutEvent, std::chrono::steady_clock::time_point>> events;
        fdevent* fde;
    };
    TimeoutTest test;

    int fds[2];
    ASSERT_EQ(0, adb_socketpair(fds));
    static constexpr auto delta = 100ms;
    fdevent_run_on_main_thread([&]() {
        test.fde = fdevent_create(fds[0], [](fdevent* fde, unsigned events, void* arg) {
            auto test = static_cast<TimeoutTest*>(arg);
            auto now = std::chrono::steady_clock::now();
            CHECK((events & FDE_READ) ^ (events & FDE_TIMEOUT));
            TimeoutEvent event;
            if ((events & FDE_READ)) {
                char buf[2];
                ssize_t rc = adb_read(fde->fd.get(), buf, sizeof(buf));
                if (rc == 0) {
                    event = TimeoutEvent::done;
                } else if (rc == 1) {
                    event = TimeoutEvent::read;
                } else {
                    abort();
                }
            } else if ((events & FDE_TIMEOUT)) {
                event = TimeoutEvent::timeout;
            } else {
                abort();
            }

            CHECK_EQ(fde, test->fde);
            test->events.emplace_back(event, now);

            if (event == TimeoutEvent::done) {
                fdevent_destroy(fde);
            }
        }, &test);
        fdevent_add(test.fde, FDE_READ);
        fdevent_set_timeout(test.fde, delta);
    });

    ASSERT_EQ(1, adb_write(fds[1], "", 1));

    // Timeout should happen here
    std::this_thread::sleep_for(delta);

    // and another.
    std::this_thread::sleep_for(delta);

    // No timeout should happen here.
    std::this_thread::sleep_for(delta / 2);
    adb_close(fds[1]);

    TerminateThread();

    ASSERT_EQ(4ULL, test.events.size());
    ASSERT_EQ(TimeoutEvent::read, test.events[0].first);
    ASSERT_EQ(TimeoutEvent::timeout, test.events[1].first);
    ASSERT_EQ(TimeoutEvent::timeout, test.events[2].first);
    ASSERT_EQ(TimeoutEvent::done, test.events[3].first);

    std::vector<int> time_deltas;
    for (size_t i = 0; i < test.events.size() - 1; ++i) {
        auto before = test.events[i].second;
        auto after = test.events[i + 1].second;
        auto diff = std::chrono::duration_cast<std::chrono::milliseconds>(after - before);
        time_deltas.push_back(diff.count());
    }

    std::vector<int> expected = {
        delta.count(),
        delta.count(),
        delta.count() / 2,
    };

    std::vector<int> diff;
    ASSERT_EQ(time_deltas.size(), expected.size());
    for (size_t i = 0; i < time_deltas.size(); ++i) {
        diff.push_back(std::abs(time_deltas[i] - expected[i]));
    }

    ASSERT_LT(diff[0], delta.count() * 0.5);
    ASSERT_LT(diff[1], delta.count() * 0.5);
    ASSERT_LT(diff[2], delta.count() * 0.5);
}
+2 −0
Original line number Diff line number Diff line
@@ -221,6 +221,8 @@ TEST_F(LocalSocketTest, write_error_when_having_packets) {
    EXPECT_EQ(2u + GetAdditionalLocalSocketCount(), fdevent_installed_count());
    ASSERT_EQ(0, adb_close(socket_fd[0]));

    std::this_thread::sleep_for(2s);

    WaitForFdeventLoop();
    ASSERT_EQ(GetAdditionalLocalSocketCount(), fdevent_installed_count());
    TerminateThread();
+55 −4
Original line number Diff line number Diff line
@@ -26,6 +26,7 @@
#include <unistd.h>

#include <algorithm>
#include <chrono>
#include <mutex>
#include <string>
#include <vector>
@@ -41,6 +42,8 @@
#include "transport.h"
#include "types.h"

using namespace std::chrono_literals;

static std::recursive_mutex& local_socket_list_lock = *new std::recursive_mutex();
static unsigned local_socket_next_id = 1;

@@ -238,16 +241,64 @@ static void local_socket_ready(asocket* s) {
    fdevent_add(s->fde, FDE_READ);
}

struct ClosingSocket {
    std::chrono::steady_clock::time_point begin;
};

// The standard (RFC 1122 - 4.2.2.13) says that if we call close on a
// socket while we have pending data, a TCP RST should be sent to the
// other end to notify it that we didn't read all of its data. However,
// this can result in data that we've successfully written out to be dropped
// on the other end. To avoid this, instead of immediately closing a
// socket, call shutdown on it instead, and then read from the file
// descriptor until we hit EOF or an error before closing.
static void deferred_close(unique_fd fd) {
    // Shutdown the socket in the outgoing direction only, so that
    // we don't have the same problem on the opposite end.
    adb_shutdown(fd.get(), SHUT_WR);
    auto callback = [](fdevent* fde, unsigned event, void* arg) {
        auto socket_info = static_cast<ClosingSocket*>(arg);
        if (event & FDE_READ) {
            ssize_t rc;
            char buf[BUFSIZ];
            while ((rc = adb_read(fde->fd.get(), buf, sizeof(buf))) > 0) {
                continue;
            }

            if (rc == -1 && errno == EAGAIN) {
                // There's potentially more data to read.
                auto duration = std::chrono::steady_clock::now() - socket_info->begin;
                if (duration > 1s) {
                    LOG(WARNING) << "timeout expired while flushing socket, closing";
                } else {
                    return;
                }
            }
        } else if (event & FDE_TIMEOUT) {
            LOG(WARNING) << "timeout expired while flushing socket, closing";
        }

        // Either there was an error, we hit the end of the socket, or our timeout expired.
        fdevent_destroy(fde);
        delete socket_info;
    };

    ClosingSocket* socket_info = new ClosingSocket{
            .begin = std::chrono::steady_clock::now(),
    };

    fdevent* fde = fdevent_create(fd.release(), callback, socket_info);
    fdevent_add(fde, FDE_READ);
    fdevent_set_timeout(fde, 1s);
}

// be sure to hold the socket list lock when calling this
static void local_socket_destroy(asocket* s) {
    int exit_on_close = s->exit_on_close;

    D("LS(%d): destroying fde.fd=%d", s->id, s->fd);

    /* IMPORTANT: the remove closes the fd
    ** that belongs to this socket
    */
    fdevent_destroy(s->fde);
    deferred_close(fdevent_release(s->fde));

    remove_socket(s);
    delete s;
Loading