Donate to e Foundation | Murena handsets with /e/OS | Own a part of Murena! Learn more

Commit 1042306b authored by Yifan Hong's avatar Yifan Hong
Browse files

binder: RpcSession::*MaxThreads -> *MaxIncomingThreads

We'll add a separate number for outgoing threads

Bug: 194225767
Test: pass

Change-Id: I7bf178c098adc6359582792a2f1ca1248a336b9f
parent a25b0120
Loading
Loading
Loading
Loading
+1 −1
Original line number Original line Diff line number Diff line
@@ -381,7 +381,7 @@ void RpcServer::establishConnection(sp<RpcServer>&& server, base::unique_fd clie
            } while (server->mSessions.end() != server->mSessions.find(sessionId));
            } while (server->mSessions.end() != server->mSessions.find(sessionId));


            session = RpcSession::make();
            session = RpcSession::make();
            session->setMaxThreads(server->mMaxThreads);
            session->setMaxIncomingThreads(server->mMaxThreads);
            if (!session->setProtocolVersion(protocolVersion)) return;
            if (!session->setProtocolVersion(protocolVersion)) return;
            if (!session->setForServer(server,
            if (!session->setForServer(server,
                                       sp<RpcServer::EventListener>::fromExisting(
                                       sp<RpcServer::EventListener>::fromExisting(
+9 −9
Original line number Original line Diff line number Diff line
@@ -76,18 +76,18 @@ sp<RpcSession> RpcSession::make(std::unique_ptr<RpcTransportCtxFactory> rpcTrans
    return sp<RpcSession>::make(std::move(ctx));
    return sp<RpcSession>::make(std::move(ctx));
}
}


void RpcSession::setMaxThreads(size_t threads) {
void RpcSession::setMaxIncomingThreads(size_t threads) {
    std::lock_guard<std::mutex> _l(mMutex);
    std::lock_guard<std::mutex> _l(mMutex);
    LOG_ALWAYS_FATAL_IF(!mConnections.mOutgoing.empty() || !mConnections.mIncoming.empty(),
    LOG_ALWAYS_FATAL_IF(!mConnections.mOutgoing.empty() || !mConnections.mIncoming.empty(),
                        "Must set max threads before setting up connections, but has %zu client(s) "
                        "Must set max incoming threads before setting up connections, but has %zu "
                        "and %zu server(s)",
                        "client(s) and %zu server(s)",
                        mConnections.mOutgoing.size(), mConnections.mIncoming.size());
                        mConnections.mOutgoing.size(), mConnections.mIncoming.size());
    mMaxThreads = threads;
    mMaxIncomingThreads = threads;
}
}


size_t RpcSession::getMaxThreads() {
size_t RpcSession::getMaxIncomingThreads() {
    std::lock_guard<std::mutex> _l(mMutex);
    std::lock_guard<std::mutex> _l(mMutex);
    return mMaxThreads;
    return mMaxIncomingThreads;
}
}


bool RpcSession::setProtocolVersion(uint32_t version) {
bool RpcSession::setProtocolVersion(uint32_t version) {
@@ -484,7 +484,7 @@ status_t RpcSession::setupClient(const std::function<status_t(const std::vector<
        if (status_t status = connectAndInit(mId, false /*incoming*/); status != OK) return status;
        if (status_t status = connectAndInit(mId, false /*incoming*/); status != OK) return status;
    }
    }


    for (size_t i = 0; i < mMaxThreads; i++) {
    for (size_t i = 0; i < mMaxIncomingThreads; i++) {
        if (status_t status = connectAndInit(mId, true /*incoming*/); status != OK) return status;
        if (status_t status = connectAndInit(mId, true /*incoming*/); status != OK) return status;
    }
    }


@@ -697,9 +697,9 @@ sp<RpcSession::RpcConnection> RpcSession::assignIncomingConnectionToThisThread(
        std::unique_ptr<RpcTransport> rpcTransport) {
        std::unique_ptr<RpcTransport> rpcTransport) {
    std::lock_guard<std::mutex> _l(mMutex);
    std::lock_guard<std::mutex> _l(mMutex);


    if (mConnections.mIncoming.size() >= mMaxThreads) {
    if (mConnections.mIncoming.size() >= mMaxIncomingThreads) {
        ALOGE("Cannot add thread to session with %zu threads (max is set to %zu)",
        ALOGE("Cannot add thread to session with %zu threads (max is set to %zu)",
              mConnections.mIncoming.size(), mMaxThreads);
              mConnections.mIncoming.size(), mMaxIncomingThreads);
        return nullptr;
        return nullptr;
    }
    }


+1 −1
Original line number Original line Diff line number Diff line
@@ -855,7 +855,7 @@ processTransactInternalTailCall:


            switch (transaction->code) {
            switch (transaction->code) {
                case RPC_SPECIAL_TRANSACT_GET_MAX_THREADS: {
                case RPC_SPECIAL_TRANSACT_GET_MAX_THREADS: {
                    replyStatus = reply.writeInt32(session->getMaxThreads());
                    replyStatus = reply.writeInt32(session->getMaxIncomingThreads());
                    break;
                    break;
                }
                }
                case RPC_SPECIAL_TRANSACT_GET_SESSION_ID: {
                case RPC_SPECIAL_TRANSACT_GET_SESSION_ID: {
+4 −4
Original line number Original line Diff line number Diff line
@@ -59,7 +59,7 @@ public:
    static sp<RpcSession> make(std::unique_ptr<RpcTransportCtxFactory> rpcTransportCtxFactory);
    static sp<RpcSession> make(std::unique_ptr<RpcTransportCtxFactory> rpcTransportCtxFactory);


    /**
    /**
     * Set the maximum number of threads allowed to be made (for things like callbacks).
     * Set the maximum number of incoming threads allowed to be made (for things like callbacks).
     * By default, this is 0. This must be called before setting up this connection as a client.
     * By default, this is 0. This must be called before setting up this connection as a client.
     * Server sessions will inherits this value from RpcServer.
     * Server sessions will inherits this value from RpcServer.
     *
     *
@@ -68,8 +68,8 @@ public:
     *
     *
     * TODO(b/189955605): start these dynamically
     * TODO(b/189955605): start these dynamically
     */
     */
    void setMaxThreads(size_t threads);
    void setMaxIncomingThreads(size_t threads);
    size_t getMaxThreads();
    size_t getMaxIncomingThreads();


    /**
    /**
     * By default, the minimum of the supported versions of the client and the
     * By default, the minimum of the supported versions of the client and the
@@ -307,7 +307,7 @@ private:


    std::mutex mMutex; // for all below
    std::mutex mMutex; // for all below


    size_t mMaxThreads = 0;
    size_t mMaxIncomingThreads = 0;
    std::optional<uint32_t> mProtocolVersion;
    std::optional<uint32_t> mProtocolVersion;


    std::condition_variable mAvailableConnectionCv; // for mWaitingThreads
    std::condition_variable mAvailableConnectionCv; // for mWaitingThreads
+1 −1
Original line number Original line Diff line number Diff line
@@ -613,7 +613,7 @@ public:
        status_t status;
        status_t status;


        for (const auto& session : sessions) {
        for (const auto& session : sessions) {
            session->setMaxThreads(options.numIncomingConnections);
            session->setMaxIncomingThreads(options.numIncomingConnections);


            switch (socketType) {
            switch (socketType) {
                case SocketType::PRECONNECTED:
                case SocketType::PRECONNECTED: