Donate to e Foundation | Murena handsets with /e/OS | Own a part of Murena! Learn more

Commit d484e2fa authored by Steven Moreland's avatar Steven Moreland Committed by Automerger Merge Worker
Browse files

Merge "libbinder: reverse connections" am: e81fe829 am: e8663b69 am: 4da487f2 am: 5cf0085c

Original change: https://android-review.googlesource.com/c/platform/frameworks/native/+/1702014

Change-Id: Iba15250f579fbca9eece80abdedb2523a6b514ad
parents f2becc23 5cf0085c
Loading
Loading
Loading
Loading
+24 −11
Original line number Original line Diff line number Diff line
@@ -239,15 +239,16 @@ void RpcServer::establishConnection(sp<RpcServer>&& server, base::unique_fd clie
    // It must be set before this thread is started
    // It must be set before this thread is started
    LOG_ALWAYS_FATAL_IF(server->mShutdownTrigger == nullptr);
    LOG_ALWAYS_FATAL_IF(server->mShutdownTrigger == nullptr);


    int32_t id;
    RpcConnectionHeader header;
    status_t status =
    status_t status = server->mShutdownTrigger->interruptableReadFully(clientFd.get(), &header,
            server->mShutdownTrigger->interruptableReadFully(clientFd.get(), &id, sizeof(id));
                                                                       sizeof(header));
    bool idValid = status == OK;
    bool idValid = status == OK;
    if (!idValid) {
    if (!idValid) {
        ALOGE("Failed to read ID for client connecting to RPC server: %s",
        ALOGE("Failed to read ID for client connecting to RPC server: %s",
              statusToString(status).c_str());
              statusToString(status).c_str());
        // still need to cleanup before we can return
        // still need to cleanup before we can return
    }
    }
    bool reverse = header.options & RPC_CONNECTION_OPTION_REVERSE;


    std::thread thisThread;
    std::thread thisThread;
    sp<RpcSession> session;
    sp<RpcSession> session;
@@ -269,24 +270,37 @@ void RpcServer::establishConnection(sp<RpcServer>&& server, base::unique_fd clie
            return;
            return;
        }
        }


        if (id == RPC_SESSION_ID_NEW) {
        if (header.sessionId == RPC_SESSION_ID_NEW) {
            if (reverse) {
                ALOGE("Cannot create a new session with a reverse connection, would leak");
                return;
            }

            LOG_ALWAYS_FATAL_IF(server->mSessionIdCounter >= INT32_MAX, "Out of session IDs");
            LOG_ALWAYS_FATAL_IF(server->mSessionIdCounter >= INT32_MAX, "Out of session IDs");
            server->mSessionIdCounter++;
            server->mSessionIdCounter++;


            session = RpcSession::make();
            session = RpcSession::make();
            session->setForServer(wp<RpcServer>(server), server->mSessionIdCounter,
            session->setForServer(server,
                                  server->mShutdownTrigger);
                                  sp<RpcServer::EventListener>::fromExisting(
                                          static_cast<RpcServer::EventListener*>(server.get())),
                                  server->mSessionIdCounter, server->mShutdownTrigger);


            server->mSessions[server->mSessionIdCounter] = session;
            server->mSessions[server->mSessionIdCounter] = session;
        } else {
        } else {
            auto it = server->mSessions.find(id);
            auto it = server->mSessions.find(header.sessionId);
            if (it == server->mSessions.end()) {
            if (it == server->mSessions.end()) {
                ALOGE("Cannot add thread, no record of session with ID %d", id);
                ALOGE("Cannot add thread, no record of session with ID %d", header.sessionId);
                return;
                return;
            }
            }
            session = it->second;
            session = it->second;
        }
        }


        if (reverse) {
            LOG_ALWAYS_FATAL_IF(!session->addClientConnection(std::move(clientFd)),
                                "server state must already be initialized");
            return;
        }

        detachGuard.Disable();
        detachGuard.Disable();
        session->preJoin(std::move(thisThread));
        session->preJoin(std::move(thisThread));
    }
    }
@@ -294,7 +308,7 @@ void RpcServer::establishConnection(sp<RpcServer>&& server, base::unique_fd clie
    // avoid strong cycle
    // avoid strong cycle
    server = nullptr;
    server = nullptr;


    session->join(std::move(clientFd));
    RpcSession::join(std::move(session), std::move(clientFd));
}
}


bool RpcServer::setupSocketServer(const RpcSocketAddress& addr) {
bool RpcServer::setupSocketServer(const RpcSocketAddress& addr) {
@@ -341,8 +355,7 @@ void RpcServer::onSessionLockedAllServerThreadsEnded(const sp<RpcSession>& sessi
    (void)mSessions.erase(it);
    (void)mSessions.erase(it);
}
}


void RpcServer::onSessionServerThreadEnded(const sp<RpcSession>& session) {
void RpcServer::onSessionServerThreadEnded() {
    (void)session;
    mShutdownCv.notify_all();
    mShutdownCv.notify_all();
}
}


+119 −24
Original line number Original line Diff line number Diff line
@@ -59,6 +59,17 @@ sp<RpcSession> RpcSession::make() {
    return sp<RpcSession>::make();
    return sp<RpcSession>::make();
}
}


void RpcSession::setMaxReverseConnections(size_t connections) {
    {
        std::lock_guard<std::mutex> _l(mMutex);
        LOG_ALWAYS_FATAL_IF(mClientConnections.size() != 0,
                            "Must setup reverse connections before setting up client connections, "
                            "but already has %zu clients",
                            mClientConnections.size());
    }
    mMaxReverseConnections = connections;
}

bool RpcSession::setupUnixDomainClient(const char* path) {
bool RpcSession::setupUnixDomainClient(const char* path) {
    return setupSocketClient(UnixSocketAddress(path));
    return setupSocketClient(UnixSocketAddress(path));
}
}
@@ -99,6 +110,20 @@ status_t RpcSession::getRemoteMaxThreads(size_t* maxThreads) {
    return state()->getMaxThreads(connection.fd(), sp<RpcSession>::fromExisting(this), maxThreads);
    return state()->getMaxThreads(connection.fd(), sp<RpcSession>::fromExisting(this), maxThreads);
}
}


bool RpcSession::shutdown() {
    std::unique_lock<std::mutex> _l(mMutex);
    LOG_ALWAYS_FATAL_IF(mForServer.promote() != nullptr, "Can only shut down client session");
    LOG_ALWAYS_FATAL_IF(mShutdownTrigger == nullptr, "Shutdown trigger not installed");
    LOG_ALWAYS_FATAL_IF(mShutdownListener == nullptr, "Shutdown listener not installed");

    mShutdownTrigger->trigger();
    mShutdownListener->waitForShutdown(_l);
    mState->terminate();

    LOG_ALWAYS_FATAL_IF(!mThreads.empty(), "Shutdown failed");
    return true;
}

status_t RpcSession::transact(const sp<IBinder>& binder, uint32_t code, const Parcel& data,
status_t RpcSession::transact(const sp<IBinder>& binder, uint32_t code, const Parcel& data,
                              Parcel* reply, uint32_t flags) {
                              Parcel* reply, uint32_t flags) {
    ExclusiveConnection connection(sp<RpcSession>::fromExisting(this),
    ExclusiveConnection connection(sp<RpcSession>::fromExisting(this),
@@ -179,6 +204,24 @@ status_t RpcSession::readId() {
    return OK;
    return OK;
}
}


void RpcSession::WaitForShutdownListener::onSessionLockedAllServerThreadsEnded(
        const sp<RpcSession>& session) {
    (void)session;
    mShutdown = true;
}

void RpcSession::WaitForShutdownListener::onSessionServerThreadEnded() {
    mCv.notify_all();
}

void RpcSession::WaitForShutdownListener::waitForShutdown(std::unique_lock<std::mutex>& lock) {
    while (!mShutdown) {
        if (std::cv_status::timeout == mCv.wait_for(lock, std::chrono::seconds(1))) {
            ALOGE("Waiting for RpcSession to shut down (1s w/o progress).");
        }
    }
}

void RpcSession::preJoin(std::thread thread) {
void RpcSession::preJoin(std::thread thread) {
    LOG_ALWAYS_FATAL_IF(thread.get_id() != std::this_thread::get_id(), "Must own this thread");
    LOG_ALWAYS_FATAL_IF(thread.get_id() != std::this_thread::get_id(), "Must own this thread");


@@ -188,14 +231,13 @@ void RpcSession::preJoin(std::thread thread) {
    }
    }
}
}


void RpcSession::join(unique_fd client) {
void RpcSession::join(sp<RpcSession>&& session, unique_fd client) {
    // must be registered to allow arbitrary client code executing commands to
    // must be registered to allow arbitrary client code executing commands to
    // be able to do nested calls (we can't only read from it)
    // be able to do nested calls (we can't only read from it)
    sp<RpcConnection> connection = assignServerToThisThread(std::move(client));
    sp<RpcConnection> connection = session->assignServerToThisThread(std::move(client));


    while (true) {
    while (true) {
        status_t error =
        status_t error = session->state()->getAndExecuteCommand(connection->fd, session);
                state()->getAndExecuteCommand(connection->fd, sp<RpcSession>::fromExisting(this));


        if (error != OK) {
        if (error != OK) {
            LOG_RPC_DETAIL("Binder connection thread closing w/ status %s",
            LOG_RPC_DETAIL("Binder connection thread closing w/ status %s",
@@ -204,22 +246,24 @@ void RpcSession::join(unique_fd client) {
        }
        }
    }
    }


    LOG_ALWAYS_FATAL_IF(!removeServerConnection(connection),
    LOG_ALWAYS_FATAL_IF(!session->removeServerConnection(connection),
                        "bad state: connection object guaranteed to be in list");
                        "bad state: connection object guaranteed to be in list");


    sp<RpcServer> server;
    sp<RpcSession::EventListener> listener;
    {
    {
        std::lock_guard<std::mutex> _l(mMutex);
        std::lock_guard<std::mutex> _l(session->mMutex);
        auto it = mThreads.find(std::this_thread::get_id());
        auto it = session->mThreads.find(std::this_thread::get_id());
        LOG_ALWAYS_FATAL_IF(it == mThreads.end());
        LOG_ALWAYS_FATAL_IF(it == session->mThreads.end());
        it->second.detach();
        it->second.detach();
        mThreads.erase(it);
        session->mThreads.erase(it);


        server = mForServer.promote();
        listener = session->mEventListener.promote();
    }
    }


    if (server != nullptr) {
    session = nullptr;
        server->onSessionServerThreadEnded(sp<RpcSession>::fromExisting(this));

    if (listener != nullptr) {
        listener->onSessionServerThreadEnded();
    }
    }
}
}


@@ -235,7 +279,7 @@ bool RpcSession::setupSocketClient(const RpcSocketAddress& addr) {
                            mClientConnections.size());
                            mClientConnections.size());
    }
    }


    if (!setupOneSocketClient(addr, RPC_SESSION_ID_NEW)) return false;
    if (!setupOneSocketConnection(addr, RPC_SESSION_ID_NEW, false /*reverse*/)) return false;


    // TODO(b/185167543): we should add additional sessions dynamically
    // TODO(b/185167543): we should add additional sessions dynamically
    // instead of all at once.
    // instead of all at once.
@@ -256,13 +300,23 @@ bool RpcSession::setupSocketClient(const RpcSocketAddress& addr) {
    // we've already setup one client
    // we've already setup one client
    for (size_t i = 0; i + 1 < numThreadsAvailable; i++) {
    for (size_t i = 0; i + 1 < numThreadsAvailable; i++) {
        // TODO(b/185167543): shutdown existing connections?
        // TODO(b/185167543): shutdown existing connections?
        if (!setupOneSocketClient(addr, mId.value())) return false;
        if (!setupOneSocketConnection(addr, mId.value(), false /*reverse*/)) return false;
    }

    // TODO(b/185167543): we should add additional sessions dynamically
    // instead of all at once - the other side should be responsible for setting
    // up additional connections. We need to create at least one (unless 0 are
    // requested to be set) in order to allow the other side to reliably make
    // any requests at all.

    for (size_t i = 0; i < mMaxReverseConnections; i++) {
        if (!setupOneSocketConnection(addr, mId.value(), true /*reverse*/)) return false;
    }
    }


    return true;
    return true;
}
}


bool RpcSession::setupOneSocketClient(const RpcSocketAddress& addr, int32_t id) {
bool RpcSession::setupOneSocketConnection(const RpcSocketAddress& addr, int32_t id, bool reverse) {
    for (size_t tries = 0; tries < 5; tries++) {
    for (size_t tries = 0; tries < 5; tries++) {
        if (tries > 0) usleep(10000);
        if (tries > 0) usleep(10000);


@@ -286,17 +340,48 @@ bool RpcSession::setupOneSocketClient(const RpcSocketAddress& addr, int32_t id)
            return false;
            return false;
        }
        }


        if (sizeof(id) != TEMP_FAILURE_RETRY(write(serverFd.get(), &id, sizeof(id)))) {
        RpcConnectionHeader header{
                .sessionId = id,
        };
        if (reverse) header.options |= RPC_CONNECTION_OPTION_REVERSE;

        if (sizeof(header) != TEMP_FAILURE_RETRY(write(serverFd.get(), &header, sizeof(header)))) {
            int savedErrno = errno;
            int savedErrno = errno;
            ALOGE("Could not write id to socket at %s: %s", addr.toString().c_str(),
            ALOGE("Could not write connection header to socket at %s: %s", addr.toString().c_str(),
                  strerror(savedErrno));
                  strerror(savedErrno));
            return false;
            return false;
        }
        }


        LOG_RPC_DETAIL("Socket at %s client with fd %d", addr.toString().c_str(), serverFd.get());
        LOG_RPC_DETAIL("Socket at %s client with fd %d", addr.toString().c_str(), serverFd.get());


        if (reverse) {
            std::mutex mutex;
            std::condition_variable joinCv;
            std::unique_lock<std::mutex> lock(mutex);
            std::thread thread;
            sp<RpcSession> thiz = sp<RpcSession>::fromExisting(this);
            bool ownershipTransferred = false;
            thread = std::thread([&]() {
                std::unique_lock<std::mutex> threadLock(mutex);
                unique_fd fd = std::move(serverFd);
                // NOLINTNEXTLINE(performance-unnecessary-copy-initialization)
                sp<RpcSession> session = thiz;
                session->preJoin(std::move(thread));
                ownershipTransferred = true;
                joinCv.notify_one();

                threadLock.unlock();
                // do not use & vars below

                RpcSession::join(std::move(session), std::move(fd));
            });
            joinCv.wait(lock, [&] { return ownershipTransferred; });
            LOG_ALWAYS_FATAL_IF(!ownershipTransferred);
            return true;
        } else {
            return addClientConnection(std::move(serverFd));
            return addClientConnection(std::move(serverFd));
        }
        }
    }


    ALOGE("Ran out of retries to connect to %s", addr.toString().c_str());
    ALOGE("Ran out of retries to connect to %s", addr.toString().c_str());
    return false;
    return false;
@@ -305,8 +390,11 @@ bool RpcSession::setupOneSocketClient(const RpcSocketAddress& addr, int32_t id)
bool RpcSession::addClientConnection(unique_fd fd) {
bool RpcSession::addClientConnection(unique_fd fd) {
    std::lock_guard<std::mutex> _l(mMutex);
    std::lock_guard<std::mutex> _l(mMutex);


    // first client connection added, but setForServer not called, so
    // initializaing for a client.
    if (mShutdownTrigger == nullptr) {
    if (mShutdownTrigger == nullptr) {
        mShutdownTrigger = FdTrigger::make();
        mShutdownTrigger = FdTrigger::make();
        mEventListener = mShutdownListener = sp<WaitForShutdownListener>::make();
        if (mShutdownTrigger == nullptr) return false;
        if (mShutdownTrigger == nullptr) return false;
    }
    }


@@ -316,14 +404,19 @@ bool RpcSession::addClientConnection(unique_fd fd) {
    return true;
    return true;
}
}


void RpcSession::setForServer(const wp<RpcServer>& server, int32_t sessionId,
void RpcSession::setForServer(const wp<RpcServer>& server, const wp<EventListener>& eventListener,
                              int32_t sessionId,
                              const std::shared_ptr<FdTrigger>& shutdownTrigger) {
                              const std::shared_ptr<FdTrigger>& shutdownTrigger) {
    LOG_ALWAYS_FATAL_IF(mForServer.unsafe_get() != nullptr);
    LOG_ALWAYS_FATAL_IF(mForServer != nullptr);
    LOG_ALWAYS_FATAL_IF(server == nullptr);
    LOG_ALWAYS_FATAL_IF(mEventListener != nullptr);
    LOG_ALWAYS_FATAL_IF(eventListener == nullptr);
    LOG_ALWAYS_FATAL_IF(mShutdownTrigger != nullptr);
    LOG_ALWAYS_FATAL_IF(mShutdownTrigger != nullptr);
    LOG_ALWAYS_FATAL_IF(shutdownTrigger == nullptr);
    LOG_ALWAYS_FATAL_IF(shutdownTrigger == nullptr);


    mId = sessionId;
    mId = sessionId;
    mForServer = server;
    mForServer = server;
    mEventListener = eventListener;
    mShutdownTrigger = shutdownTrigger;
    mShutdownTrigger = shutdownTrigger;
}
}


@@ -343,9 +436,9 @@ bool RpcSession::removeServerConnection(const sp<RpcConnection>& connection) {
        it != mServerConnections.end()) {
        it != mServerConnections.end()) {
        mServerConnections.erase(it);
        mServerConnections.erase(it);
        if (mServerConnections.size() == 0) {
        if (mServerConnections.size() == 0) {
            sp<RpcServer> server = mForServer.promote();
            sp<EventListener> listener = mEventListener.promote();
            if (server) {
            if (listener) {
                server->onSessionLockedAllServerThreadsEnded(sp<RpcSession>::fromExisting(this));
                listener->onSessionLockedAllServerThreadsEnded(sp<RpcSession>::fromExisting(this));
            }
            }
        }
        }
        return true;
        return true;
@@ -405,6 +498,8 @@ RpcSession::ExclusiveConnection::ExclusiveConnection(const sp<RpcSession>& sessi
            break;
            break;
        }
        }


        // TODO(b/185167543): this should return an error, rather than crash a
        // server
        // in regular binder, this would usually be a deadlock :)
        // in regular binder, this would usually be a deadlock :)
        LOG_ALWAYS_FATAL_IF(mSession->mClientConnections.size() == 0,
        LOG_ALWAYS_FATAL_IF(mSession->mClientConnections.size() == 0,
                            "Session has no client connections. This is required for an RPC server "
                            "Session has no client connections. This is required for an RPC server "
+1 −0
Original line number Original line Diff line number Diff line
@@ -383,6 +383,7 @@ status_t RpcState::transactAddress(const base::unique_fd& fd, const RpcAddress&
        return status;
        return status;


    if (flags & IBinder::FLAG_ONEWAY) {
    if (flags & IBinder::FLAG_ONEWAY) {
        LOG_RPC_DETAIL("Oneway command, so no longer waiting on %d", fd.get());
        return OK; // do not wait for result
        return OK; // do not wait for result
    }
    }


+1 −1
Original line number Original line Diff line number Diff line
@@ -86,7 +86,6 @@ public:
    size_t countBinders();
    size_t countBinders();
    void dump();
    void dump();


private:
    /**
    /**
     * Called when reading or writing data to a session fails to clean up
     * Called when reading or writing data to a session fails to clean up
     * data associated with the session in order to cleanup binders.
     * data associated with the session in order to cleanup binders.
@@ -105,6 +104,7 @@ private:
     */
     */
    void terminate();
    void terminate();


private:
    // Alternative to std::vector<uint8_t> that doesn't abort on allocation failure and caps
    // Alternative to std::vector<uint8_t> that doesn't abort on allocation failure and caps
    // large allocations to avoid being requested from allocating too much data.
    // large allocations to avoid being requested from allocating too much data.
    struct CommandData {
    struct CommandData {
+12 −2
Original line number Original line Diff line number Diff line
@@ -20,6 +20,18 @@ namespace android {
#pragma clang diagnostic push
#pragma clang diagnostic push
#pragma clang diagnostic error "-Wpadded"
#pragma clang diagnostic error "-Wpadded"


constexpr int32_t RPC_SESSION_ID_NEW = -1;

enum : uint8_t {
    RPC_CONNECTION_OPTION_REVERSE = 0x1,
};

struct RpcConnectionHeader {
    int32_t sessionId;
    uint8_t options;
    uint8_t reserved[3];
};

enum : uint32_t {
enum : uint32_t {
    /**
    /**
     * follows is RpcWireTransaction, if flags != oneway, reply w/ RPC_COMMAND_REPLY expected
     * follows is RpcWireTransaction, if flags != oneway, reply w/ RPC_COMMAND_REPLY expected
@@ -51,8 +63,6 @@ enum : uint32_t {
    RPC_SPECIAL_TRANSACT_GET_SESSION_ID = 2,
    RPC_SPECIAL_TRANSACT_GET_SESSION_ID = 2,
};
};


constexpr int32_t RPC_SESSION_ID_NEW = -1;

// serialization is like:
// serialization is like:
// |RpcWireHeader|struct desginated by 'command'| (over and over again)
// |RpcWireHeader|struct desginated by 'command'| (over and over again)


Loading