Donate to e Foundation | Murena handsets with /e/OS | Own a part of Murena! Learn more

Commit c51726cb authored by Josh Gao's avatar Josh Gao
Browse files

adbd: implement a nonblocking USB Connection.

Implement a Connection that implements a nonblocking interface to
functionfs, to replace the existing implementation that uses two
threads that loop and call read and write respectively. The existing
implementation is vulnerable to a race condition that can occur when a
connection is terminated, where one thread can notice failure and
complete reinitialization of the USB endpoints before the other thread
noticed anything went wrong, resulting in either the first packet
coming from the other end disappearing in to the void, or the other end
getting a packet of garbage.

As a side benefit, this improves performance on walleye from:

    push 100MiB: 10 runs: median 49.48 MiB/s, mean 50.00 MiB/s, stddev: 2.77 MiB/s
    pull 100MiB: 10 runs: median 75.82 MiB/s, mean 76.18 MiB/s, stddev: 6.60 MiB/s

to:

    push 100MiB: 10 runs: median 73.90 MiB/s, mean 73.51 MiB/s, stddev: 5.26 MiB/s
    pull 100MiB: 10 runs: median 105.90 MiB/s, mean 107.19 MiB/s, stddev: 6.10 MiB/s

Test: python test_device.py
Change-Id: I9b77c1057965edfef739ed9736e5d76613adf60a
parent 61e9e39b
Loading
Loading
Loading
Loading
+2 −0
Original line number Diff line number Diff line
@@ -25,6 +25,7 @@ cc_defaults {
        "-Wvla",
    ],
    rtti: true,
    cpp_std: "gnu++17",

    use_version_lib: true,

@@ -313,6 +314,7 @@ cc_library_static {
    srcs: libadb_srcs + libadb_posix_srcs + [
        "daemon/auth.cpp",
        "daemon/jdwp_service.cpp",
        "daemon/usb.cpp",
        "daemon/usb_ffs.cpp",
        "daemon/usb_legacy.cpp",
    ],

adb/daemon/usb.cpp

0 → 100644
+585 −0

File added.

Preview size limit exceeded, changes collapsed.

+5 −5
Original line number Diff line number Diff line
@@ -94,8 +94,8 @@ static bool init_functionfs(struct usb_handle* h) {
    return true;
}

static void usb_ffs_open_thread(usb_handle* usb) {
    adb_thread_setname("usb ffs open");
static void usb_legacy_ffs_open_thread(usb_handle* usb) {
    adb_thread_setname("usb legacy ffs open");

    while (true) {
        // wait until the USB device needs opening
@@ -285,12 +285,12 @@ usb_handle* create_usb_handle(unsigned num_bufs, unsigned io_size) {
    return h;
}

void usb_init() {
    D("[ usb_init - using FunctionFS ]");
void usb_init_legacy() {
    D("[ usb_init - using legacy FunctionFS ]");
    dummy_fd.reset(adb_open("/dev/null", O_WRONLY | O_CLOEXEC));
    CHECK_NE(-1, dummy_fd.get());

    std::thread(usb_ffs_open_thread, create_usb_handle(USB_FFS_NUM_BUFS, USB_FFS_BULK_SIZE))
    std::thread(usb_legacy_ffs_open_thread, create_usb_handle(USB_FFS_NUM_BUFS, USB_FFS_BULK_SIZE))
            .detach();
}

+5 −3
Original line number Diff line number Diff line
@@ -52,7 +52,6 @@
#include "fdevent.h"
#include "sysdeps/chrono.h"

static void register_transport(atransport* transport);
static void remove_transport(atransport* transport);
static void transport_unref(atransport* transport);

@@ -671,7 +670,7 @@ static void transport_registration_func(int _fd, unsigned ev, void*) {
            return true;
        });
        t->connection()->SetErrorCallback([t](Connection*, const std::string& error) {
            D("%s: connection terminated: %s", t->serial.c_str(), error.c_str());
            LOG(INFO) << t->serial_name() << ": connection terminated: " << error;
            fdevent_run_on_main_thread([t]() {
                handle_offline(t);
                transport_unref(t);
@@ -730,7 +729,7 @@ void kick_all_transports() {
}

/* the fdevent select pump is single threaded */
static void register_transport(atransport* transport) {
void register_transport(atransport* transport) {
    tmsg m;
    m.transport = transport;
    m.action = 1;
@@ -758,6 +757,7 @@ static void transport_unref(atransport* t) {
    CHECK_GT(t->ref_count, 0u);
    t->ref_count--;
    if (t->ref_count == 0) {
        LOG(INFO) << "destroying transport " << t->serial_name();
        t->connection()->Stop();
#if ADB_HOST
        if (t->IsTcpDevice() && !t->kicked()) {
@@ -1293,6 +1293,7 @@ void register_usb_transport(usb_handle* usb, const char* serial, const char* dev
    register_transport(t);
}

#if ADB_HOST
// This should only be used for transports with connection_state == kCsNoPerm.
void unregister_usb_transport(usb_handle* usb) {
    std::lock_guard<std::recursive_mutex> lock(transport_lock);
@@ -1304,6 +1305,7 @@ void unregister_usb_transport(usb_handle* usb) {
        return false;
    });
}
#endif

bool check_header(apacket* p, atransport* t) {
    if (p->msg.magic != (p->msg.command ^ 0xffffffff)) {
+1 −0
Original line number Diff line number Diff line
@@ -362,6 +362,7 @@ atransport* find_transport(const char* serial);
void kick_all_tcp_devices();
void kick_all_transports();

void register_transport(atransport* transport);
void register_usb_transport(usb_handle* h, const char* serial,
                            const char* devpath, unsigned writeable);