Donate to e Foundation | Murena handsets with /e/OS | Own a part of Murena! Learn more

Commit 9d11b923 authored by Steven Moreland's avatar Steven Moreland
Browse files

libbinder: RpcServer - shutdown connection threads

Re-use FdTrigger to shutdown connection threads.

Coming next - shutting down sessions as well!

Bug: 185167543
Test: binderRpcTest, binder_rpc_fuzzer
Change-Id: I238f1e2a5f69fdec09ac8b3afc484ab8639852fa
parent c77116c3
Loading
Loading
Loading
Loading
+29 −7
Original line number Original line Diff line number Diff line
@@ -184,10 +184,18 @@ bool RpcServer::acceptOne() {


bool RpcServer::shutdown() {
bool RpcServer::shutdown() {
    std::unique_lock<std::mutex> _l(mLock);
    std::unique_lock<std::mutex> _l(mLock);
    if (mShutdownTrigger == nullptr) return false;
    if (mShutdownTrigger == nullptr) {
        LOG_RPC_DETAIL("Cannot shutdown. No shutdown trigger installed.");
        return false;
    }


    mShutdownTrigger->trigger();
    mShutdownTrigger->trigger();
    while (mJoinThreadRunning) mShutdownCv.wait(_l);
    while (mJoinThreadRunning || !mConnectingThreads.empty()) {
        ALOGI("Waiting for RpcServer to shut down. Join thread running: %d, Connecting threads: "
              "%zu",
              mJoinThreadRunning, mConnectingThreads.size());
        mShutdownCv.wait(_l);
    }


    // At this point, we know join() is about to exit, but the thread that calls
    // At this point, we know join() is about to exit, but the thread that calls
    // join() may not have exited yet.
    // join() may not have exited yet.
@@ -222,17 +230,21 @@ size_t RpcServer::numUninitializedSessions() {
void RpcServer::establishConnection(sp<RpcServer>&& server, base::unique_fd clientFd) {
void RpcServer::establishConnection(sp<RpcServer>&& server, base::unique_fd clientFd) {
    // TODO(b/183988761): cannot trust this simple ID
    // TODO(b/183988761): cannot trust this simple ID
    LOG_ALWAYS_FATAL_IF(!server->mAgreedExperimental, "no!");
    LOG_ALWAYS_FATAL_IF(!server->mAgreedExperimental, "no!");
    bool idValid = true;

    // mShutdownTrigger can only be cleared once connection threads have joined.
    // It must be set before this thread is started
    LOG_ALWAYS_FATAL_IF(server->mShutdownTrigger == nullptr);

    int32_t id;
    int32_t id;
    if (sizeof(id) != read(clientFd.get(), &id, sizeof(id))) {
    bool idValid = server->mShutdownTrigger->interruptableRecv(clientFd.get(), &id, sizeof(id));
        ALOGE("Could not read ID from fd %d", clientFd.get());
    if (!idValid) {
        idValid = false;
        ALOGE("Failed to read ID for client connecting to RPC server.");
    }
    }


    std::thread thisThread;
    std::thread thisThread;
    sp<RpcSession> session;
    sp<RpcSession> session;
    {
    {
        std::lock_guard<std::mutex> _l(server->mLock);
        std::unique_lock<std::mutex> _l(server->mLock);


        auto threadId = server->mConnectingThreads.find(std::this_thread::get_id());
        auto threadId = server->mConnectingThreads.find(std::this_thread::get_id());
        LOG_ALWAYS_FATAL_IF(threadId == server->mConnectingThreads.end(),
        LOG_ALWAYS_FATAL_IF(threadId == server->mConnectingThreads.end(),
@@ -241,6 +253,16 @@ void RpcServer::establishConnection(sp<RpcServer>&& server, base::unique_fd clie
        ScopeGuard detachGuard = [&]() { thisThread.detach(); };
        ScopeGuard detachGuard = [&]() { thisThread.detach(); };
        server->mConnectingThreads.erase(threadId);
        server->mConnectingThreads.erase(threadId);


        // TODO(b/185167543): we currently can't disable this because we don't
        // shutdown sessions as well, only the server itself. So, we need to
        // keep this separate from the detachGuard, since we temporarily want to
        // give a notification even when we pass ownership of the thread to
        // a session.
        ScopeGuard threadLifetimeGuard = [&]() {
            _l.unlock();
            server->mShutdownCv.notify_all();
        };

        if (!idValid) {
        if (!idValid) {
            return;
            return;
        }
        }
+16 −0
Original line number Original line Diff line number Diff line
@@ -144,6 +144,22 @@ bool RpcSession::FdTrigger::triggerablePollRead(base::borrowed_fd fd) {
    }
    }
}
}


bool RpcSession::FdTrigger::interruptableRecv(base::borrowed_fd fd, void* data, size_t size) {
    uint8_t* buffer = reinterpret_cast<uint8_t*>(data);
    uint8_t* end = buffer + size;

    while (triggerablePollRead(fd)) {
        ssize_t readSize = TEMP_FAILURE_RETRY(recv(fd.get(), buffer, end - buffer, MSG_NOSIGNAL));
        if (readSize < 0) {
            ALOGE("Failed to read %s", strerror(errno));
            return false;
        }
        buffer += readSize;
        if (buffer == end) return true;
    }
    return false;
}

status_t RpcSession::readId() {
status_t RpcSession::readId() {
    {
    {
        std::lock_guard<std::mutex> _l(mMutex);
        std::lock_guard<std::mutex> _l(mMutex);
+9 −0
Original line number Original line Diff line number Diff line
@@ -134,6 +134,15 @@ private:
         */
         */
        bool triggerablePollRead(base::borrowed_fd fd);
        bool triggerablePollRead(base::borrowed_fd fd);


        /**
         * Read, but allow the read to be interrupted by this trigger.
         *
         * Return:
         *   true - read succeeded at 'size'
         *   false - interrupted (failure or trigger)
         */
        bool interruptableRecv(base::borrowed_fd fd, void* data, size_t size);

    private:
    private:
        base::unique_fd mWrite;
        base::unique_fd mWrite;
        base::unique_fd mRead;
        base::unique_fd mRead;
+11 −2
Original line number Original line Diff line number Diff line
@@ -20,6 +20,7 @@
#include <binder/Parcel.h>
#include <binder/Parcel.h>
#include <binder/RpcServer.h>
#include <binder/RpcServer.h>
#include <binder/RpcSession.h>
#include <binder/RpcSession.h>
#include <fuzzer/FuzzedDataProvider.h>


#include <sys/resource.h>
#include <sys/resource.h>
#include <sys/un.h>
#include <sys/un.h>
@@ -53,6 +54,7 @@ class SomeBinder : public BBinder {


extern "C" int LLVMFuzzerTestOneInput(const uint8_t* data, size_t size) {
extern "C" int LLVMFuzzerTestOneInput(const uint8_t* data, size_t size) {
    if (size > 50000) return 0;
    if (size > 50000) return 0;
    FuzzedDataProvider provider(data, size);


    unlink(kSock.c_str());
    unlink(kSock.c_str());


@@ -84,14 +86,21 @@ extern "C" int LLVMFuzzerTestOneInput(const uint8_t* data, size_t size) {
    CHECK(base::WriteFully(clientFd, &id, sizeof(id)));
    CHECK(base::WriteFully(clientFd, &id, sizeof(id)));
#endif
#endif


    CHECK(base::WriteFully(clientFd, data, size));
    bool hangupBeforeShutdown = provider.ConsumeBool();


    std::vector<uint8_t> writeData = provider.ConsumeRemainingBytes<uint8_t>();
    CHECK(base::WriteFully(clientFd, writeData.data(), writeData.size()));

    if (hangupBeforeShutdown) {
        clientFd.reset();
        clientFd.reset();
    }


    // TODO(185167543): currently this is okay because we only shutdown the one
    // TODO(185167543): currently this is okay because we only shutdown the one
    // thread, but once we can shutdown other sessions, we'll need to change
    // thread, but once we can shutdown other sessions, we'll need to change
    // this behavior in order to make sure all of the input is actually read.
    // this behavior in order to make sure all of the input is actually read.
    while (!server->shutdown()) usleep(100);
    while (!server->shutdown()) usleep(100);

    clientFd.reset();
    serverThread.join();
    serverThread.join();


    // TODO(b/185167543): better way to force a server to shutdown
    // TODO(b/185167543): better way to force a server to shutdown