Donate to e Foundation | Murena handsets with /e/OS | Own a part of Murena! Learn more

Commit 103424e0 authored by Steven Moreland's avatar Steven Moreland
Browse files

libbinder: RPC more symmetrical max threads

Now, RpcServer and RpcSession both keep track of their max threads using
the same variable, and the server can therefore request the number of
reverse connections possible.

Bug: 185167543
Test: N/A
Change-Id: Ieaff69c8c2da2faf7598aed7e862601a1fcd7a00
parent e54384bb
Loading
Loading
Loading
Loading
+1 −0
Original line number Diff line number Diff line
@@ -280,6 +280,7 @@ void RpcServer::establishConnection(sp<RpcServer>&& server, base::unique_fd clie
            server->mSessionIdCounter++;

            session = RpcSession::make();
            session->setMaxThreads(server->mMaxThreads);
            session->setForServer(server,
                                  sp<RpcServer::EventListener>::fromExisting(
                                          static_cast<RpcServer::EventListener*>(server.get())),
+13 −10
Original line number Diff line number Diff line
@@ -59,15 +59,18 @@ sp<RpcSession> RpcSession::make() {
    return sp<RpcSession>::make();
}

void RpcSession::setMaxReverseConnections(size_t connections) {
    {
void RpcSession::setMaxThreads(size_t threads) {
    std::lock_guard<std::mutex> _l(mMutex);
        LOG_ALWAYS_FATAL_IF(mClientConnections.size() != 0,
                            "Must setup reverse connections before setting up client connections, "
                            "but already has %zu clients",
                            mClientConnections.size());
    LOG_ALWAYS_FATAL_IF(!mClientConnections.empty() || !mServerConnections.empty(),
                        "Must set max threads before setting up connections, but has %zu client(s) "
                        "and %zu server(s)",
                        mClientConnections.size(), mServerConnections.size());
    mMaxThreads = threads;
}
    mMaxReverseConnections = connections;

size_t RpcSession::getMaxThreads() {
    std::lock_guard<std::mutex> _l(mMutex);
    return mMaxThreads;
}

bool RpcSession::setupUnixDomainClient(const char* path) {
@@ -309,7 +312,7 @@ bool RpcSession::setupSocketClient(const RpcSocketAddress& addr) {
    // requested to be set) in order to allow the other side to reliably make
    // any requests at all.

    for (size_t i = 0; i < mMaxReverseConnections; i++) {
    for (size_t i = 0; i < mMaxThreads; i++) {
        if (!setupOneSocketConnection(addr, mId.value(), true /*reverse*/)) return false;
    }

+26 −26
Original line number Diff line number Diff line
@@ -625,28 +625,26 @@ status_t RpcState::processTransactInternal(const base::unique_fd& fd, const sp<R
        } else {
            LOG_RPC_DETAIL("Got special transaction %u", transaction->code);

            sp<RpcServer> server = session->server().promote();
            if (server) {
                // special case for 'zero' address (special server commands)
            switch (transaction->code) {
                    case RPC_SPECIAL_TRANSACT_GET_ROOT: {
                        replyStatus = reply.writeStrongBinder(server->getRootObject());
                        break;
                    }
                case RPC_SPECIAL_TRANSACT_GET_MAX_THREADS: {
                        replyStatus = reply.writeInt32(server->getMaxThreads());
                    replyStatus = reply.writeInt32(session->getMaxThreads());
                    break;
                }
                case RPC_SPECIAL_TRANSACT_GET_SESSION_ID: {
                        // only sessions w/ services can be the source of a
                        // session ID (so still guarded by non-null server)
                        //
                        // sessions associated with servers must have an ID
                        // (hence abort)
                    // for client connections, this should always report the value
                    // originally returned from the server
                    int32_t id = session->mId.value();
                    replyStatus = reply.writeInt32(id);
                    break;
                }
                default: {
                    sp<RpcServer> server = session->server().promote();
                    if (server) {
                        switch (transaction->code) {
                            case RPC_SPECIAL_TRANSACT_GET_ROOT: {
                                replyStatus = reply.writeStrongBinder(server->getRootObject());
                                break;
                            }
                            default: {
                                replyStatus = UNKNOWN_TRANSACTION;
                            }
@@ -656,6 +654,8 @@ status_t RpcState::processTransactInternal(const base::unique_fd& fd, const sp<R
                    }
                }
            }
        }
    }

    if (transaction->flags & IBinder::FLAG_ONEWAY) {
        if (replyStatus != OK) {
+6 −5
Original line number Diff line number Diff line
@@ -47,16 +47,17 @@ public:
    static sp<RpcSession> make();

    /**
     * Set the maximum number of reverse connections allowed to be made (for
     * things like callbacks). By default, this is 0. This must be called before
     * setting up this connection as a client.
     * Set the maximum number of threads allowed to be made (for things like callbacks).
     * By default, this is 0. This must be called before setting up this connection as a client.
     * Server sessions will inherits this value from RpcServer.
     *
     * If this is called, 'shutdown' on this session must also be called.
     * Otherwise, a threadpool will leak.
     *
     * TODO(b/185167543): start these dynamically
     */
    void setMaxReverseConnections(size_t connections);
    void setMaxThreads(size_t threads);
    size_t getMaxThreads();

    /**
     * This should be called once per thread, matching 'join' in the remote
@@ -257,7 +258,7 @@ private:

    std::mutex mMutex; // for all below

    size_t mMaxReverseConnections = 0;
    size_t mMaxThreads = 0;

    std::condition_variable mAvailableConnectionCv; // for mWaitingThreads
    size_t mWaitingThreads = 0;
+1 −1
Original line number Diff line number Diff line
@@ -446,7 +446,7 @@ public:

        for (size_t i = 0; i < numSessions; i++) {
            sp<RpcSession> session = RpcSession::make();
            session->setMaxReverseConnections(numReverseConnections);
            session->setMaxThreads(numReverseConnections);

            switch (socketType) {
                case SocketType::UNIX: