Donate to e Foundation | Murena handsets with /e/OS | Own a part of Murena! Learn more

Commit c0727c8c authored by Febin Thattil's avatar Febin Thattil
Browse files

[AOA FFS Implementation] Add accessory legacy bridge

Contains the implementation of async IO for piping USB reads from FFS to
socket and pipe writes from socket to USB FFS.

This will be used in a followup change.
Bug: 407985367
Bug: 416211970
Flag: android.hardware.usb.flags.enable_aoa_userspace_implementation
Test: No build breakage
Change-Id: If37a3f97d8ea52cf2f02207a29d662256d94bc89
parent 3ae0ee9c
Loading
Loading
Loading
Loading
+1 −0
Original line number Diff line number Diff line
@@ -109,6 +109,7 @@ cc_defaults {
        "libandroid_runtime",
        "libandroidfw",
        "libaudioclient",
        "libasyncio",
        "libbase",
        "libappfuse",
        "libbinder_ndk",
+326 −0
Original line number Diff line number Diff line
@@ -16,7 +16,9 @@

#define LOG_TAG "UsbDeviceManagerJNI"
#include <android-base/properties.h>
#include <android-base/scopeguard.h>
#include <android-base/unique_fd.h>
#include <asyncio/AsyncIO.h>
#include <core_jni_helpers.h>
#include <fcntl.h>
#include <linux/uhid.h>
@@ -25,7 +27,10 @@
#include <nativehelper/ScopedUtfChars.h>
#include <stdio.h>
#include <sys/epoll.h>
#include <sys/eventfd.h>
#include <sys/ioctl.h>
#include <sys/poll.h>
#include <sys/socket.h>
#include <sys/stat.h>
#include <sys/types.h>

@@ -927,6 +932,327 @@ public:
};
static std::unique_ptr<NativeVendorControlRequestMonitorThread> sVendorControlRequestMonitorThread;

/*
 * NativeAccessoryLegacyBridgeThread starts a new thread to monitor accessory endpoints
 * and pass on the data to the legacy accessory mode. It also monitors if client has closed the
 * legacy accessory fd and informs the device manager to tear down accessory mode.
 */
class NativeAccessoryLegacyBridgeThread {
    android::base::unique_fd mFfsReadFd;
    android::base::unique_fd mFfsWriteFd;
    int mAppSocketFd;
    int mMaxPacketSize;
    int mShutdownPipeFd[2];
    std::thread mThread;
    jobject mCallbackObj;

    std::vector<struct iocb> mIocb;
    std::vector<struct iocb*> mIocbs;
    std::vector<char> mData;
    std::vector<struct io_event> mEvents;
    android::base::unique_fd mReadEventFd;
    aio_context_t mReadCtx = 0;
    aio_context_t mWriteCtx = 0;

    static constexpr int USB_FFS_NUM_BUFS = 16;
    static constexpr int USB_FFS_BUF_SIZE = 16384;
    static constexpr int USB_FFS_ALL_BUFS_SIZE = USB_FFS_NUM_BUFS * USB_FFS_BUF_SIZE;
    static constexpr int USB_FFS_BUF_WRITE_OFFSET = USB_FFS_ALL_BUFS_SIZE;
    struct timespec ZERO_TIMEOUT = {0, 0};

    // TODO: b/440767444 - Explore use of other options in place of AIO
    int iobufSubmit(aio_context_t ctx, struct iocb **iocbs, int fd, char *buf, int length, int evfd,
                    bool read, int maxPacketSize) {
        int prepared_data_iocb_count = 0;
        size_t bytes_assigned = 0;

        // Determine if a ZLP might be needed after this data transfer (only for writes)
        bool zlp_conditionally_needed =
                (!read && length > 0 && (length % static_cast<size_t>(maxPacketSize) == 0));

        int max_iocbs_for_data_payload = USB_FFS_NUM_BUFS;
        if (zlp_conditionally_needed) {
            max_iocbs_for_data_payload = USB_FFS_NUM_BUFS - 1; // Reserve one slot for ZLP
        }

        for (int j = 0; j < max_iocbs_for_data_payload; j++) {
            if (bytes_assigned >= length) {
                break;
            }
            size_t current_chunk_size =
                    std::min(static_cast<size_t>(USB_FFS_BUF_SIZE), length - bytes_assigned);

            io_prep(iocbs[j], fd, buf + bytes_assigned, current_chunk_size, /* offset= */ 0, read);
            iocbs[j]->aio_data = reinterpret_cast<uint64_t>(iocbs[j]);

            // TODO: b/440765338 - Improve use of eventfd path
            if (evfd != -1) { // ensure eventfd is set for current IOCB
                iocbs[j]->aio_flags |= IOCB_FLAG_RESFD;
                iocbs[j]->aio_resfd = evfd;
            }

            bytes_assigned += current_chunk_size;
            prepared_data_iocb_count++;
        }

        int total_iocbs_to_submit_in_batch = prepared_data_iocb_count;
        bool zlp_actually_appended = false;

        // TODO: b/440767232 - Implement rescheduled ZLP
        if (prepared_data_iocb_count > 0 && zlp_conditionally_needed) {
            if (prepared_data_iocb_count < USB_FFS_NUM_BUFS) { // Space for ZLP IOCB
                io_prep_pwrite(iocbs[prepared_data_iocb_count], fd, nullptr, 0, 0); // ZLP
                iocbs[prepared_data_iocb_count]->aio_data =
                        reinterpret_cast<uint64_t>(iocbs[prepared_data_iocb_count]);

                // TODO: b/440765338 - Improve use of eventfd path
                if (evfd != -1) {
                    iocbs[prepared_data_iocb_count]->aio_flags |= IOCB_FLAG_RESFD;
                    iocbs[prepared_data_iocb_count]->aio_resfd = evfd;
                }
                total_iocbs_to_submit_in_batch++;
                zlp_actually_appended = true;
            }
        }

        int submitted_count = io_submit(ctx, total_iocbs_to_submit_in_batch, iocbs);

        if (submitted_count < 0) {
            ALOGE("io_submit for %s failed (prepared %d): %d - %s", read ? "read" : "write",
                  total_iocbs_to_submit_in_batch, submitted_count, strerror(errno));

            return submitted_count;
        }

        if (submitted_count != total_iocbs_to_submit_in_batch) {
            ALOGW("io_submit only enqueued %d of %d prepared IOCBs", submitted_count,
                  total_iocbs_to_submit_in_batch);
            if (zlp_actually_appended) {
                // This implies ZLP might not have been submitted.
                ALOGE("ZLP was prepared but might not have been submitted due to short submit!");
            }
        }
        return submitted_count;
    }

    bool handleReadEvents() {
        uint64_t ev_cnt = 0;
        if (::read(mReadEventFd, &ev_cnt, sizeof(ev_cnt)) == -1) {
            ALOGE("unable to read eventfd: %s", strerror(errno));
            return false;
        }

        int num_events = TEMP_FAILURE_RETRY(
                io_getevents(mReadCtx, 0, USB_FFS_NUM_BUFS, mEvents.data(), &ZERO_TIMEOUT));
        if (num_events < 0) {
            ALOGE("error getting events: %s", strerror(errno));
            return false;
        }

        // Process all completed events
        for (int i = 0; i < num_events; i++) {
            struct iocb* completed_iocb = reinterpret_cast<struct iocb*>(mEvents[i].obj);
            long result = mEvents[i].res;

            if (result < 0) {
                ALOGE("got error event on read: %s", strerror(-result));
                return false;
            }

            if (!completed_iocb) {
                ALOGE("mEvents[%d].obj is NULL! Skipping this event.", i);
                continue;
            }

            // Write completed data to the socket
            int ret = write(mAppSocketFd, reinterpret_cast<void*>(completed_iocb->aio_buf), result);
            if (ret < 0) {
                ALOGE("error writing to socket: %s", strerror(errno));
                return false;
            }

            // Re-submit the iocb for another read
            io_prep_pread(completed_iocb, mFfsReadFd.get(),
                    reinterpret_cast<char*>(completed_iocb->aio_buf), USB_FFS_BUF_SIZE, 0);
            completed_iocb->aio_data = reinterpret_cast<uint64_t>(completed_iocb);
            completed_iocb->aio_flags |= IOCB_FLAG_RESFD;
            completed_iocb->aio_resfd = mReadEventFd.get();

            struct iocb* iocbs_to_submit[] = {completed_iocb};
            int submitted_count = io_submit(mReadCtx, 1, iocbs_to_submit);
            if (submitted_count < 0) {
                ALOGE("Error re-submitting read iocb: %d - %s", submitted_count, strerror(errno));
                return false;
            }
            if (submitted_count != 1) {
                ALOGW("Could not re-submit read iocb");
            }
        }
        return true;
    }

    bool handleSocketInput() {
        char* write_buf = mData.data() + USB_FFS_BUF_WRITE_OFFSET;
        int ret = read(mAppSocketFd, write_buf, USB_FFS_ALL_BUFS_SIZE);

        if (ret == 0) {
            ALOGE("accessory socket closed by client");
            return false;
        }
        if (ret < 0) {
            ALOGE("accessory socket read failed: %s", strerror(errno));
            return false;
        }

        struct iocb** write_iocbs = mIocbs.data() + USB_FFS_NUM_BUFS;
        ret = iobufSubmit(mWriteCtx, write_iocbs, mFfsWriteFd.get(), write_buf, ret, -1, false,
                          mMaxPacketSize);
        if (ret < 0) {
            return false;
        }

        int this_events =
                TEMP_FAILURE_RETRY(io_getevents(mWriteCtx, ret, ret, mEvents.data(), nullptr));
        if (this_events < ret) {
            ALOGE("got error waiting for write to complete, expected %d, got %d", ret, this_events);
            return false;
        }
        for (int i = 0; i < ret; i++) {
            if (mEvents[i].res < 0) {
                ALOGE("got error event on %d of %d write events: %s", i, ret,
                      strerror(-mEvents[i].res));
                return false;
            }
        }
        return true;
    }

    void monitorLoop() {
        struct pollfd pollFds[3];

        JNIEnv* env = nullptr;
        JavaVMAttachArgs aargs = {JNI_VERSION_1_4, "NativeAccessoryLegacyBridgeThread", nullptr};
        if (gvm->AttachCurrentThread(&env, &aargs) != JNI_OK || env == nullptr) {
            ALOGE("Couldn't attach thread");
            return;
        }
        auto detach = android::base::make_scope_guard([&] { gvm->DetachCurrentThread(); });

        pollFds[0].fd = mReadEventFd.get();
        pollFds[0].events = POLLIN;
        pollFds[1].fd = mAppSocketFd;
        pollFds[1].events = POLLIN | POLLHUP | POLLRDHUP | POLLERR;
        pollFds[2].fd = mShutdownPipeFd[0];
        pollFds[2].events = POLLIN;

        struct iocb** read_iocbs = mIocbs.data();
        char* read_buf = mData.data();
        int ret;

        ret = iobufSubmit(mReadCtx, read_iocbs, mFfsReadFd.get(), read_buf, USB_FFS_ALL_BUFS_SIZE,
                          mReadEventFd.get(), true, mMaxPacketSize);
        if (ret < 0) {
            ALOGE("accessory legacy bridge initial read iobufSubmit failed");
            return;
        }
        ALOGI("accessory legacy bridge thread loop starting...");

        while (true) {
            if (poll(pollFds, 3, -1) == -1) {
                if (errno == EINTR) {
                    continue;
                }
                ALOGE("Error during poll: %s", strerror(errno));
                break;
            }

            if (pollFds[0].revents & POLLIN) {
                pollFds[0].revents = 0;
                if (!handleReadEvents()) {
                    goto exit;
                }
            }

            if (pollFds[1].revents & POLLIN) {
                pollFds[1].revents = 0;
                if (!handleSocketInput()) {
                    goto exit;
                }
            }

            if (pollFds[1].revents & (POLLHUP | POLLERR | POLLRDHUP)) {
                pollFds[1].revents = 0;
                ALOGI("accessory socket shut down: %s", strerror(errno));
                goto exit;
            }

            if (pollFds[2].revents & POLLIN) {
                ALOGI("Shutdown signaled for accessory legacy bridge thread.");
                goto exit;
            }
        }
    exit:
        ALOGI("accessory socket thread exiting using readFd %d and writeFd %d", mFfsReadFd.get(),
              mFfsWriteFd.get());
    }

    void stop() {
        if (mThread.joinable()) {
            int c = 'q';
            write(mShutdownPipeFd[1], &c, 1);
            mThread.join();
        }
    }

    DISALLOW_COPY_AND_ASSIGN(NativeAccessoryLegacyBridgeThread);

public:
    explicit NativeAccessoryLegacyBridgeThread(jobject obj, android::base::unique_fd ffs_read_fd,
                                               android::base::unique_fd ffs_write_fd,
                                               int app_socket_end, int max_packet_size)
          : mFfsReadFd(std::move(ffs_read_fd)),
            mFfsWriteFd(std::move(ffs_write_fd)),
            mAppSocketFd(app_socket_end),
            mMaxPacketSize(max_packet_size) {
        mCallbackObj = AndroidRuntime::getJNIEnv()->NewGlobalRef(obj);

        mData.resize(USB_FFS_ALL_BUFS_SIZE * 2);
        mIocb.resize(USB_FFS_NUM_BUFS * 2);
        mIocbs.resize(USB_FFS_NUM_BUFS * 2);
        mEvents.resize(USB_FFS_NUM_BUFS);
        for (unsigned i = 0; i < USB_FFS_NUM_BUFS * 2; i++) {
            mIocbs[i] = &mIocb[i];
        }

        if (io_setup(USB_FFS_NUM_BUFS, &mReadCtx) < 0 ||
            io_setup(USB_FFS_NUM_BUFS, &mWriteCtx) < 0) {
            ALOGE("unable to setup aio");
            return;
        }

        mReadEventFd.reset(eventfd(0, 0));

        pipe(mShutdownPipeFd);
        mThread = std::thread(&NativeAccessoryLegacyBridgeThread::monitorLoop, this);
    }


    ~NativeAccessoryLegacyBridgeThread() {
        ALOGD("tearing down NativeAccessoryLegacyBridgeThread...");
        stop();
        close(mShutdownPipeFd[0]);
        close(mShutdownPipeFd[1]);
        if (mReadCtx != 0) {
            if (io_destroy(mReadCtx) < 0) ALOGE("io_destroy read_ctx failed");
        }
        if (mWriteCtx != 0) {
            if (io_destroy(mWriteCtx) < 0) ALOGE("io_destroy write_ctx failed");
        }
        AndroidRuntime::getJNIEnv()->DeleteGlobalRef(mCallbackObj);
    }
};
static std::unique_ptr<NativeAccessoryLegacyBridgeThread> sAccessoryLegacyBridgeThread;

static void set_accessory_string(JNIEnv *env, int fd, int cmd, jobjectArray strArray, int index)
{
    char buffer[256];