Donate to e Foundation | Murena handsets with /e/OS | Own a part of Murena! Learn more

Commit 41303f8d authored by Treehugger Robot's avatar Treehugger Robot Committed by Automerger Merge Worker
Browse files

Merge "libbinder: RPC mThreadState -> mConnections" am: a25b0120 am: 53ad1b32

Original change: https://android-review.googlesource.com/c/platform/frameworks/native/+/1843848

Change-Id: I0dd75f7cd590b5ff5ddd3ad05d5bdcab6ea26ee3
parents f403e54d 53ad1b32
Loading
Loading
Loading
Loading
+37 −40
Original line number Diff line number Diff line
@@ -61,7 +61,7 @@ RpcSession::~RpcSession() {
    LOG_RPC_DETAIL("RpcSession destroyed %p", this);

    std::lock_guard<std::mutex> _l(mMutex);
    LOG_ALWAYS_FATAL_IF(mThreadState.mIncomingConnections.size() != 0,
    LOG_ALWAYS_FATAL_IF(mConnections.mIncoming.size() != 0,
                        "Should not be able to destroy a session with servers in use.");
}

@@ -78,12 +78,10 @@ sp<RpcSession> RpcSession::make(std::unique_ptr<RpcTransportCtxFactory> rpcTrans

void RpcSession::setMaxThreads(size_t threads) {
    std::lock_guard<std::mutex> _l(mMutex);
    LOG_ALWAYS_FATAL_IF(!mThreadState.mOutgoingConnections.empty() ||
                                !mThreadState.mIncomingConnections.empty(),
    LOG_ALWAYS_FATAL_IF(!mConnections.mOutgoing.empty() || !mConnections.mIncoming.empty(),
                        "Must set max threads before setting up connections, but has %zu client(s) "
                        "and %zu server(s)",
                        mThreadState.mOutgoingConnections.size(),
                        mThreadState.mIncomingConnections.size());
                        mConnections.mOutgoing.size(), mConnections.mIncoming.size());
    mMaxThreads = threads;
}

@@ -197,7 +195,7 @@ bool RpcSession::shutdownAndWait(bool wait) {
        LOG_ALWAYS_FATAL_IF(mShutdownListener == nullptr, "Shutdown listener not installed");
        mShutdownListener->waitForShutdown(_l, sp<RpcSession>::fromExisting(this));

        LOG_ALWAYS_FATAL_IF(!mThreadState.mThreads.empty(), "Shutdown failed");
        LOG_ALWAYS_FATAL_IF(!mConnections.mThreads.empty(), "Shutdown failed");
    }

    _l.unlock();
@@ -263,11 +261,11 @@ void RpcSession::WaitForShutdownListener::onSessionIncomingThreadEnded() {

void RpcSession::WaitForShutdownListener::waitForShutdown(std::unique_lock<std::mutex>& lock,
                                                          const sp<RpcSession>& session) {
    while (session->mThreadState.mIncomingConnections.size() > 0) {
    while (session->mConnections.mIncoming.size() > 0) {
        if (std::cv_status::timeout == mCv.wait_for(lock, std::chrono::seconds(1))) {
            ALOGE("Waiting for RpcSession to shut down (1s w/o progress): %zu incoming connections "
                  "still.",
                  session->mThreadState.mIncomingConnections.size());
                  session->mConnections.mIncoming.size());
        }
    }
}
@@ -277,7 +275,7 @@ void RpcSession::preJoinThreadOwnership(std::thread thread) {

    {
        std::lock_guard<std::mutex> _l(mMutex);
        mThreadState.mThreads[thread.get_id()] = std::move(thread);
        mConnections.mThreads[thread.get_id()] = std::move(thread);
    }
}

@@ -380,10 +378,10 @@ void RpcSession::join(sp<RpcSession>&& session, PreJoinSetupResult&& setupResult
    sp<RpcSession::EventListener> listener;
    {
        std::lock_guard<std::mutex> _l(session->mMutex);
        auto it = session->mThreadState.mThreads.find(std::this_thread::get_id());
        LOG_ALWAYS_FATAL_IF(it == session->mThreadState.mThreads.end());
        auto it = session->mConnections.mThreads.find(std::this_thread::get_id());
        LOG_ALWAYS_FATAL_IF(it == session->mConnections.mThreads.end());
        it->second.detach();
        session->mThreadState.mThreads.erase(it);
        session->mConnections.mThreads.erase(it);

        listener = session->mEventListener.promote();
    }
@@ -414,9 +412,9 @@ status_t RpcSession::setupClient(const std::function<status_t(const std::vector<
                                                              bool incoming)>& connectAndInit) {
    {
        std::lock_guard<std::mutex> _l(mMutex);
        LOG_ALWAYS_FATAL_IF(mThreadState.mOutgoingConnections.size() != 0,
        LOG_ALWAYS_FATAL_IF(mConnections.mOutgoing.size() != 0,
                            "Must only setup session once, but already has %zu clients",
                            mThreadState.mOutgoingConnections.size());
                            mConnections.mOutgoing.size());
    }

    if (auto status = initShutdownTrigger(); status != OK) return status;
@@ -439,7 +437,7 @@ status_t RpcSession::setupClient(const std::function<status_t(const std::vector<
        // downgrade again
        mProtocolVersion = oldProtocolVersion;

        mThreadState = {};
        mConnections = {};
    });

    if (status_t status = connectAndInit({}, false /*incoming*/); status != OK) return status;
@@ -662,7 +660,7 @@ status_t RpcSession::addOutgoingConnection(std::unique_ptr<RpcTransport> rpcTran
        std::lock_guard<std::mutex> _l(mMutex);
        connection->rpcTransport = std::move(rpcTransport);
        connection->exclusiveTid = gettid();
        mThreadState.mOutgoingConnections.push_back(connection);
        mConnections.mOutgoing.push_back(connection);
    }

    status_t status = OK;
@@ -699,9 +697,9 @@ sp<RpcSession::RpcConnection> RpcSession::assignIncomingConnectionToThisThread(
        std::unique_ptr<RpcTransport> rpcTransport) {
    std::lock_guard<std::mutex> _l(mMutex);

    if (mThreadState.mIncomingConnections.size() >= mMaxThreads) {
    if (mConnections.mIncoming.size() >= mMaxThreads) {
        ALOGE("Cannot add thread to session with %zu threads (max is set to %zu)",
              mThreadState.mIncomingConnections.size(), mMaxThreads);
              mConnections.mIncoming.size(), mMaxThreads);
        return nullptr;
    }

@@ -709,7 +707,7 @@ sp<RpcSession::RpcConnection> RpcSession::assignIncomingConnectionToThisThread(
    // happens when new connections are still being established as part of a
    // very short-lived session which shuts down after it already started
    // accepting new connections.
    if (mThreadState.mIncomingConnections.size() < mThreadState.mMaxIncomingConnections) {
    if (mConnections.mIncoming.size() < mConnections.mMaxIncoming) {
        return nullptr;
    }

@@ -717,19 +715,19 @@ sp<RpcSession::RpcConnection> RpcSession::assignIncomingConnectionToThisThread(
    session->rpcTransport = std::move(rpcTransport);
    session->exclusiveTid = gettid();

    mThreadState.mIncomingConnections.push_back(session);
    mThreadState.mMaxIncomingConnections = mThreadState.mIncomingConnections.size();
    mConnections.mIncoming.push_back(session);
    mConnections.mMaxIncoming = mConnections.mIncoming.size();

    return session;
}

bool RpcSession::removeIncomingConnection(const sp<RpcConnection>& connection) {
    std::unique_lock<std::mutex> _l(mMutex);
    if (auto it = std::find(mThreadState.mIncomingConnections.begin(),
                            mThreadState.mIncomingConnections.end(), connection);
        it != mThreadState.mIncomingConnections.end()) {
        mThreadState.mIncomingConnections.erase(it);
        if (mThreadState.mIncomingConnections.size() == 0) {
    if (auto it =
                std::find(mConnections.mIncoming.begin(), mConnections.mIncoming.end(), connection);
        it != mConnections.mIncoming.end()) {
        mConnections.mIncoming.erase(it);
        if (mConnections.mIncoming.size() == 0) {
            sp<EventListener> listener = mEventListener.promote();
            if (listener) {
                _l.unlock();
@@ -754,7 +752,7 @@ status_t RpcSession::ExclusiveConnection::find(const sp<RpcSession>& session, Co
    pid_t tid = gettid();
    std::unique_lock<std::mutex> _l(session->mMutex);

    session->mThreadState.mWaitingThreads++;
    session->mConnections.mWaitingThreads++;
    while (true) {
        sp<RpcConnection> exclusive;
        sp<RpcConnection> available;
@@ -762,11 +760,11 @@ status_t RpcSession::ExclusiveConnection::find(const sp<RpcSession>& session, Co
        // CHECK FOR DEDICATED CLIENT SOCKET
        //
        // A server/looper should always use a dedicated connection if available
        findConnection(tid, &exclusive, &available, session->mThreadState.mOutgoingConnections,
                       session->mThreadState.mOutgoingConnectionsOffset);
        findConnection(tid, &exclusive, &available, session->mConnections.mOutgoing,
                       session->mConnections.mOutgoingOffset);

        // WARNING: this assumes a server cannot request its client to send
        // a transaction, as mIncomingConnections is excluded below.
        // a transaction, as mIncoming is excluded below.
        //
        // Imagine we have more than one thread in play, and a single thread
        // sends a synchronous, then an asynchronous command. Imagine the
@@ -776,9 +774,8 @@ status_t RpcSession::ExclusiveConnection::find(const sp<RpcSession>& session, Co
        // command. So, we move to considering the second available thread
        // for subsequent calls.
        if (use == ConnectionUse::CLIENT_ASYNC && (exclusive != nullptr || available != nullptr)) {
            session->mThreadState.mOutgoingConnectionsOffset =
                    (session->mThreadState.mOutgoingConnectionsOffset + 1) %
                    session->mThreadState.mOutgoingConnections.size();
            session->mConnections.mOutgoingOffset = (session->mConnections.mOutgoingOffset + 1) %
                    session->mConnections.mOutgoing.size();
        }

        // USE SERVING SOCKET (e.g. nested transaction)
@@ -786,7 +783,7 @@ status_t RpcSession::ExclusiveConnection::find(const sp<RpcSession>& session, Co
            sp<RpcConnection> exclusiveIncoming;
            // server connections are always assigned to a thread
            findConnection(tid, &exclusiveIncoming, nullptr /*available*/,
                           session->mThreadState.mIncomingConnections, 0 /* index hint */);
                           session->mConnections.mIncoming, 0 /* index hint */);

            // asynchronous calls cannot be nested, we currently allow ref count
            // calls to be nested (so that you can use this without having extra
@@ -815,20 +812,20 @@ status_t RpcSession::ExclusiveConnection::find(const sp<RpcSession>& session, Co
            break;
        }

        if (session->mThreadState.mOutgoingConnections.size() == 0) {
        if (session->mConnections.mOutgoing.size() == 0) {
            ALOGE("Session has no client connections. This is required for an RPC server to make "
                  "any non-nested (e.g. oneway or on another thread) calls. Use: %d. Server "
                  "connections: %zu",
                  static_cast<int>(use), session->mThreadState.mIncomingConnections.size());
                  static_cast<int>(use), session->mConnections.mIncoming.size());
            return WOULD_BLOCK;
        }

        LOG_RPC_DETAIL("No available connections (have %zu clients and %zu servers). Waiting...",
                       session->mThreadState.mOutgoingConnections.size(),
                       session->mThreadState.mIncomingConnections.size());
                       session->mConnections.mOutgoing.size(),
                       session->mConnections.mIncoming.size());
        session->mAvailableConnectionCv.wait(_l);
    }
    session->mThreadState.mWaitingThreads--;
    session->mConnections.mWaitingThreads--;

    return OK;
}
@@ -867,7 +864,7 @@ RpcSession::ExclusiveConnection::~ExclusiveConnection() {
    if (!mReentrant && mConnection != nullptr) {
        std::unique_lock<std::mutex> _l(mSession->mMutex);
        mConnection->exclusiveTid = std::nullopt;
        if (mSession->mThreadState.mWaitingThreads > 0) {
        if (mSession->mConnections.mWaitingThreads > 0) {
            _l.unlock();
            mSession->mAvailableConnectionCv.notify_one();
        }
+9 −9
Original line number Diff line number Diff line
@@ -281,13 +281,13 @@ private:

    const std::unique_ptr<RpcTransportCtx> mCtx;

    // On the other side of a session, for each of mOutgoingConnections here, there should
    // be one of mIncomingConnections on the other side (and vice versa).
    // On the other side of a session, for each of mOutgoing here, there should
    // be one of mIncoming on the other side (and vice versa).
    //
    // For the simplest session, a single server with one client, you would
    // have:
    //  - the server has a single 'mIncomingConnections' and a thread listening on this
    //  - the client has a single 'mOutgoingConnections' and makes calls to this
    //  - the server has a single 'mIncoming' and a thread listening on this
    //  - the client has a single 'mOutgoing' and makes calls to this
    //  - here, when the client makes a call, the server can call back into it
    //    (nested calls), but outside of this, the client will only ever read
    //    calls from the server when it makes a call itself.
@@ -315,12 +315,12 @@ private:
    struct ThreadState {
        size_t mWaitingThreads = 0;
        // hint index into clients, ++ when sending an async transaction
        size_t mOutgoingConnectionsOffset = 0;
        std::vector<sp<RpcConnection>> mOutgoingConnections;
        size_t mMaxIncomingConnections = 0;
        std::vector<sp<RpcConnection>> mIncomingConnections;
        size_t mOutgoingOffset = 0;
        std::vector<sp<RpcConnection>> mOutgoing;
        size_t mMaxIncoming = 0;
        std::vector<sp<RpcConnection>> mIncoming;
        std::map<std::thread::id, std::thread> mThreads;
    } mThreadState;
    } mConnections;
};

} // namespace android