Donate to e Foundation | Murena handsets with /e/OS | Own a part of Murena! Learn more

Commit ffa3aaac authored by Andrei Homescu's avatar Andrei Homescu
Browse files

libbinder: add build option for single-threaded RPC

Trusty does not support threading. This adds a build option
to disable mutexes and other threading code from RpcState,
RpcSession, and RpcServer.

Bug: 224644083
Test: build Trusty
Change-Id: Iaa78caca1ddee45be7c2def2755598decc0d4d15
parent 992a405f
Loading
Loading
Loading
Loading
+25 −0
Original line number Diff line number Diff line
/*
 * Copyright (C) 2022 The Android Open Source Project
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *      http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

namespace android {

#ifdef BINDER_RPC_SINGLE_THREADED
constexpr bool kEnableRpcThreads = false;
#else
constexpr bool kEnableRpcThreads = true;
#endif

} // namespace android
+24 −2
Original line number Diff line number Diff line
@@ -28,25 +28,45 @@ namespace android {

std::unique_ptr<FdTrigger> FdTrigger::make() {
    auto ret = std::make_unique<FdTrigger>();
#ifndef BINDER_RPC_SINGLE_THREADED
    if (!android::base::Pipe(&ret->mRead, &ret->mWrite)) {
        ALOGE("Could not create pipe %s", strerror(errno));
        return nullptr;
    }
#endif
    return ret;
}

void FdTrigger::trigger() {
#ifdef BINDER_RPC_SINGLE_THREADED
    mTriggered = true;
#else
    mWrite.reset();
#endif
}

bool FdTrigger::isTriggered() {
#ifdef BINDER_RPC_SINGLE_THREADED
    return mTriggered;
#else
    return mWrite == -1;
#endif
}

status_t FdTrigger::triggerablePoll(base::borrowed_fd fd, int16_t event) {
#ifdef BINDER_RPC_SINGLE_THREADED
    if (mTriggered) {
        return DEAD_OBJECT;
    }
#endif

    LOG_ALWAYS_FATAL_IF(event == 0, "triggerablePoll %d with event 0 is not allowed", fd.get());
    pollfd pfd[]{{.fd = fd.get(), .events = static_cast<int16_t>(event), .revents = 0},
                 {.fd = mRead.get(), .events = 0, .revents = 0}};
    pollfd pfd[]{
            {.fd = fd.get(), .events = static_cast<int16_t>(event), .revents = 0},
#ifndef BINDER_RPC_SINGLE_THREADED
            {.fd = mRead.get(), .events = 0, .revents = 0},
#endif
    };
    int ret = TEMP_FAILURE_RETRY(poll(pfd, arraysize(pfd), -1));
    if (ret < 0) {
        return -errno;
@@ -55,6 +75,7 @@ status_t FdTrigger::triggerablePoll(base::borrowed_fd fd, int16_t event) {

    // At least one FD has events. Check them.

#ifndef BINDER_RPC_SINGLE_THREADED
    // Detect explicit trigger(): DEAD_OBJECT
    if (pfd[1].revents & POLLHUP) {
        return DEAD_OBJECT;
@@ -68,6 +89,7 @@ status_t FdTrigger::triggerablePoll(base::borrowed_fd fd, int16_t event) {

    // pfd[1].revents is 0, hence pfd[0].revents must be set, and only possible values are
    // a subset of event | POLLHUP | POLLERR | POLLNVAL.
#endif

    // POLLNVAL: invalid FD number, e.g. not opened.
    if (pfd[0].revents & POLLNVAL) {
+4 −0
Original line number Diff line number Diff line
@@ -55,7 +55,11 @@ public:
    [[nodiscard]] status_t triggerablePoll(base::borrowed_fd fd, int16_t event);

private:
#ifdef BINDER_RPC_SINGLE_THREADED
    bool mTriggered = false;
#else
    base::unique_fd mWrite;
    base::unique_fd mRead;
#endif
};
} // namespace android
+42 −26
Original line number Diff line number Diff line
@@ -32,6 +32,7 @@
#include <log/log.h>
#include <utils/Compat.h>

#include "BuildFlags.h"
#include "FdTrigger.h"
#include "RpcSocketAddress.h"
#include "RpcState.h"
@@ -131,27 +132,27 @@ void RpcServer::setSupportedFileDescriptorTransportModes(
}

void RpcServer::setRootObject(const sp<IBinder>& binder) {
    std::lock_guard<std::mutex> _l(mLock);
    RpcMutexLockGuard _l(mLock);
    mRootObjectFactory = nullptr;
    mRootObjectWeak = mRootObject = binder;
}

void RpcServer::setRootObjectWeak(const wp<IBinder>& binder) {
    std::lock_guard<std::mutex> _l(mLock);
    RpcMutexLockGuard _l(mLock);
    mRootObject.clear();
    mRootObjectFactory = nullptr;
    mRootObjectWeak = binder;
}
void RpcServer::setPerSessionRootObject(
        std::function<sp<IBinder>(const void*, size_t)>&& makeObject) {
    std::lock_guard<std::mutex> _l(mLock);
    RpcMutexLockGuard _l(mLock);
    mRootObject.clear();
    mRootObjectWeak.clear();
    mRootObjectFactory = std::move(makeObject);
}

sp<IBinder> RpcServer::getRootObject() {
    std::lock_guard<std::mutex> _l(mLock);
    RpcMutexLockGuard _l(mLock);
    bool hasWeak = mRootObjectWeak.unsafe_get();
    sp<IBinder> ret = mRootObjectWeak.promote();
    ALOGW_IF(hasWeak && ret == nullptr, "RpcServer root object is freed, returning nullptr");
@@ -159,7 +160,7 @@ sp<IBinder> RpcServer::getRootObject() {
}

std::vector<uint8_t> RpcServer::getCertificate(RpcCertificateFormat format) {
    std::lock_guard<std::mutex> _l(mLock);
    RpcMutexLockGuard _l(mLock);
    return mCtx->getCertificate(format);
}

@@ -168,15 +169,17 @@ static void joinRpcServer(sp<RpcServer>&& thiz) {
}

void RpcServer::start() {
    std::lock_guard<std::mutex> _l(mLock);
    RpcMutexLockGuard _l(mLock);
    LOG_ALWAYS_FATAL_IF(mJoinThread.get(), "Already started!");
    mJoinThread = std::make_unique<std::thread>(&joinRpcServer, sp<RpcServer>::fromExisting(this));
    mJoinThread =
            std::make_unique<RpcMaybeThread>(&joinRpcServer, sp<RpcServer>::fromExisting(this));
    rpcJoinIfSingleThreaded(*mJoinThread);
}

void RpcServer::join() {

    {
        std::lock_guard<std::mutex> _l(mLock);
        RpcMutexLockGuard _l(mLock);
        LOG_ALWAYS_FATAL_IF(!mServer.ok(), "RpcServer must be setup to join.");
        LOG_ALWAYS_FATAL_IF(mShutdownTrigger != nullptr, "Already joined");
        mJoinThreadRunning = true;
@@ -204,24 +207,31 @@ void RpcServer::join() {
        LOG_RPC_DETAIL("accept4 on fd %d yields fd %d", mServer.get(), clientFd.get());

        {
            std::lock_guard<std::mutex> _l(mLock);
            std::thread thread =
                    std::thread(&RpcServer::establishConnection, sp<RpcServer>::fromExisting(this),
            RpcMutexLockGuard _l(mLock);
            RpcMaybeThread thread = RpcMaybeThread(&RpcServer::establishConnection,
                                                   sp<RpcServer>::fromExisting(this),
                                                   std::move(clientFd), addr, addrLen);
            mConnectingThreads[thread.get_id()] = std::move(thread);

            auto& threadRef = mConnectingThreads[thread.get_id()];
            threadRef = std::move(thread);
            rpcJoinIfSingleThreaded(threadRef);
        }
    }
    LOG_RPC_DETAIL("RpcServer::join exiting with %s", statusToString(status).c_str());

    {
        std::lock_guard<std::mutex> _l(mLock);
    if constexpr (kEnableRpcThreads) {
        RpcMutexLockGuard _l(mLock);
        mJoinThreadRunning = false;
    } else {
        // Multi-threaded builds clear this in shutdown(), but we need it valid
        // so the loop above exits cleanly
        mShutdownTrigger = nullptr;
    }
    mShutdownCv.notify_all();
}

bool RpcServer::shutdown() {
    std::unique_lock<std::mutex> _l(mLock);
    RpcMutexUniqueLock _l(mLock);
    if (mShutdownTrigger == nullptr) {
        LOG_RPC_DETAIL("Cannot shutdown. No shutdown trigger installed (already shutdown?)");
        return false;
@@ -232,10 +242,16 @@ bool RpcServer::shutdown() {
    for (auto& [id, session] : mSessions) {
        (void)id;
        // server lock is a more general lock
        std::lock_guard<std::mutex> _lSession(session->mMutex);
        RpcMutexLockGuard _lSession(session->mMutex);
        session->mShutdownTrigger->trigger();
    }

    if constexpr (!kEnableRpcThreads) {
        // In single-threaded mode we're done here, everything else that
        // needs to happen should be at the end of RpcServer::join()
        return true;
    }

    while (mJoinThreadRunning || !mConnectingThreads.empty() || !mSessions.empty()) {
        if (std::cv_status::timeout == mShutdownCv.wait_for(_l, std::chrono::seconds(1))) {
            ALOGE("Waiting for RpcServer to shut down (1s w/o progress). Join thread running: %d, "
@@ -263,7 +279,7 @@ bool RpcServer::shutdown() {
}

std::vector<sp<RpcSession>> RpcServer::listSessions() {
    std::lock_guard<std::mutex> _l(mLock);
    RpcMutexLockGuard _l(mLock);
    std::vector<sp<RpcSession>> sessions;
    for (auto& [id, session] : mSessions) {
        (void)id;
@@ -273,7 +289,7 @@ std::vector<sp<RpcSession>> RpcServer::listSessions() {
}

size_t RpcServer::numUninitializedSessions() {
    std::lock_guard<std::mutex> _l(mLock);
    RpcMutexLockGuard _l(mLock);
    return mConnectingThreads.size();
}

@@ -354,12 +370,12 @@ void RpcServer::establishConnection(sp<RpcServer>&& server, base::unique_fd clie
        }
    }

    std::thread thisThread;
    RpcMaybeThread thisThread;
    sp<RpcSession> session;
    {
        std::unique_lock<std::mutex> _l(server->mLock);
        RpcMutexUniqueLock _l(server->mLock);

        auto threadId = server->mConnectingThreads.find(std::this_thread::get_id());
        auto threadId = server->mConnectingThreads.find(rpc_this_thread::get_id());
        LOG_ALWAYS_FATAL_IF(threadId == server->mConnectingThreads.end(),
                            "Must establish connection on owned thread");
        thisThread = std::move(threadId->second);
@@ -505,7 +521,7 @@ void RpcServer::onSessionAllIncomingThreadsEnded(const sp<RpcSession>& session)
    LOG_RPC_DETAIL("Dropping session with address %s",
                   base::HexString(id.data(), id.size()).c_str());

    std::lock_guard<std::mutex> _l(mLock);
    RpcMutexLockGuard _l(mLock);
    auto it = mSessions.find(id);
    LOG_ALWAYS_FATAL_IF(it == mSessions.end(), "Bad state, unknown session id %s",
                        base::HexString(id.data(), id.size()).c_str());
@@ -519,17 +535,17 @@ void RpcServer::onSessionIncomingThreadEnded() {
}

bool RpcServer::hasServer() {
    std::lock_guard<std::mutex> _l(mLock);
    RpcMutexLockGuard _l(mLock);
    return mServer.ok();
}

unique_fd RpcServer::releaseServer() {
    std::lock_guard<std::mutex> _l(mLock);
    RpcMutexLockGuard _l(mLock);
    return std::move(mServer);
}

status_t RpcServer::setupExternalServer(base::unique_fd serverFd) {
    std::lock_guard<std::mutex> _l(mLock);
    RpcMutexLockGuard _l(mLock);
    if (mServer.ok()) {
        ALOGE("Each RpcServer can only have one server.");
        return INVALID_OPERATION;
+36 −31
Original line number Diff line number Diff line
@@ -21,7 +21,6 @@
#include <dlfcn.h>
#include <inttypes.h>
#include <poll.h>
#include <pthread.h>
#include <unistd.h>

#include <string_view>
@@ -60,7 +59,7 @@ RpcSession::RpcSession(std::unique_ptr<RpcTransportCtx> ctx) : mCtx(std::move(ct
RpcSession::~RpcSession() {
    LOG_RPC_DETAIL("RpcSession destroyed %p", this);

    std::lock_guard<std::mutex> _l(mMutex);
    RpcMutexLockGuard _l(mMutex);
    LOG_ALWAYS_FATAL_IF(mConnections.mIncoming.size() != 0,
                        "Should not be able to destroy a session with servers in use.");
}
@@ -77,7 +76,7 @@ sp<RpcSession> RpcSession::make(std::unique_ptr<RpcTransportCtxFactory> rpcTrans
}

void RpcSession::setMaxIncomingThreads(size_t threads) {
    std::lock_guard<std::mutex> _l(mMutex);
    RpcMutexLockGuard _l(mMutex);
    LOG_ALWAYS_FATAL_IF(!mConnections.mOutgoing.empty() || !mConnections.mIncoming.empty(),
                        "Must set max incoming threads before setting up connections, but has %zu "
                        "client(s) and %zu server(s)",
@@ -86,12 +85,12 @@ void RpcSession::setMaxIncomingThreads(size_t threads) {
}

size_t RpcSession::getMaxIncomingThreads() {
    std::lock_guard<std::mutex> _l(mMutex);
    RpcMutexLockGuard _l(mMutex);
    return mMaxIncomingThreads;
}

void RpcSession::setMaxOutgoingThreads(size_t threads) {
    std::lock_guard<std::mutex> _l(mMutex);
    RpcMutexLockGuard _l(mMutex);
    LOG_ALWAYS_FATAL_IF(!mConnections.mOutgoing.empty() || !mConnections.mIncoming.empty(),
                        "Must set max outgoing threads before setting up connections, but has %zu "
                        "client(s) and %zu server(s)",
@@ -100,7 +99,7 @@ void RpcSession::setMaxOutgoingThreads(size_t threads) {
}

size_t RpcSession::getMaxOutgoingThreads() {
    std::lock_guard<std::mutex> _l(mMutex);
    RpcMutexLockGuard _l(mMutex);
    return mMaxOutgoingThreads;
}

@@ -113,7 +112,7 @@ bool RpcSession::setProtocolVersion(uint32_t version) {
        return false;
    }

    std::lock_guard<std::mutex> _l(mMutex);
    RpcMutexLockGuard _l(mMutex);
    if (mProtocolVersion && version > *mProtocolVersion) {
        ALOGE("Cannot upgrade explicitly capped protocol version %u to newer version %u",
              *mProtocolVersion, version);
@@ -125,7 +124,7 @@ bool RpcSession::setProtocolVersion(uint32_t version) {
}

std::optional<uint32_t> RpcSession::getProtocolVersion() {
    std::lock_guard<std::mutex> _l(mMutex);
    RpcMutexLockGuard _l(mMutex);
    return mProtocolVersion;
}

@@ -209,7 +208,7 @@ status_t RpcSession::getRemoteMaxThreads(size_t* maxThreads) {
}

bool RpcSession::shutdownAndWait(bool wait) {
    std::unique_lock<std::mutex> _l(mMutex);
    RpcMutexUniqueLock _l(mMutex);
    LOG_ALWAYS_FATAL_IF(mShutdownTrigger == nullptr, "Shutdown trigger not installed");

    mShutdownTrigger->trigger();
@@ -222,6 +221,7 @@ bool RpcSession::shutdownAndWait(bool wait) {
    }

    _l.unlock();

    mRpcBinderState->clear();

    return true;
@@ -256,7 +256,7 @@ status_t RpcSession::sendDecStrongToTarget(uint64_t address, size_t target) {

status_t RpcSession::readId() {
    {
        std::lock_guard<std::mutex> _l(mMutex);
        RpcMutexLockGuard _l(mMutex);
        LOG_ALWAYS_FATAL_IF(mForServer != nullptr, "Can only update ID for client.");
    }

@@ -282,7 +282,7 @@ void RpcSession::WaitForShutdownListener::onSessionIncomingThreadEnded() {
    mCv.notify_all();
}

void RpcSession::WaitForShutdownListener::waitForShutdown(std::unique_lock<std::mutex>& lock,
void RpcSession::WaitForShutdownListener::waitForShutdown(RpcMutexUniqueLock& lock,
                                                          const sp<RpcSession>& session) {
    while (session->mConnections.mIncoming.size() > 0) {
        if (std::cv_status::timeout == mCv.wait_for(lock, std::chrono::seconds(1))) {
@@ -293,11 +293,11 @@ void RpcSession::WaitForShutdownListener::waitForShutdown(std::unique_lock<std::
    }
}

void RpcSession::preJoinThreadOwnership(std::thread thread) {
    LOG_ALWAYS_FATAL_IF(thread.get_id() != std::this_thread::get_id(), "Must own this thread");
void RpcSession::preJoinThreadOwnership(RpcMaybeThread thread) {
    LOG_ALWAYS_FATAL_IF(thread.get_id() != rpc_this_thread::get_id(), "Must own this thread");

    {
        std::lock_guard<std::mutex> _l(mMutex);
        RpcMutexLockGuard _l(mMutex);
        mConnections.mThreads[thread.get_id()] = std::move(thread);
    }
}
@@ -404,8 +404,8 @@ void RpcSession::join(sp<RpcSession>&& session, PreJoinSetupResult&& setupResult

    sp<RpcSession::EventListener> listener;
    {
        std::lock_guard<std::mutex> _l(session->mMutex);
        auto it = session->mConnections.mThreads.find(std::this_thread::get_id());
        RpcMutexLockGuard _l(session->mMutex);
        auto it = session->mConnections.mThreads.find(rpc_this_thread::get_id());
        LOG_ALWAYS_FATAL_IF(it == session->mConnections.mThreads.end());
        it->second.detach();
        session->mConnections.mThreads.erase(it);
@@ -438,7 +438,7 @@ sp<RpcServer> RpcSession::server() {
status_t RpcSession::setupClient(const std::function<status_t(const std::vector<uint8_t>& sessionId,
                                                              bool incoming)>& connectAndInit) {
    {
        std::lock_guard<std::mutex> _l(mMutex);
        RpcMutexLockGuard _l(mMutex);
        LOG_ALWAYS_FATAL_IF(mConnections.mOutgoing.size() != 0,
                            "Must only setup session once, but already has %zu clients",
                            mConnections.mOutgoing.size());
@@ -500,7 +500,11 @@ status_t RpcSession::setupClient(const std::function<status_t(const std::vector<
        return status;
    }

#ifdef BINDER_RPC_SINGLE_THREADED
    constexpr size_t outgoingThreads = 1;
#else  // BINDER_RPC_SINGLE_THREADED
    size_t outgoingThreads = std::min(numThreadsAvailable, mMaxOutgoingThreads);
#endif // BINDER_RPC_SINGLE_THREADED
    ALOGI_IF(outgoingThreads != numThreadsAvailable,
             "Server hints client to start %zu outgoing threads, but client will only start %zu "
             "because it is preconfigured to start at most %zu outgoing threads.",
@@ -655,14 +659,14 @@ status_t RpcSession::initAndAddConnection(unique_fd fd, const std::vector<uint8_
}

status_t RpcSession::addIncomingConnection(std::unique_ptr<RpcTransport> rpcTransport) {
    std::mutex mutex;
    std::condition_variable joinCv;
    std::unique_lock<std::mutex> lock(mutex);
    std::thread thread;
    RpcMutex mutex;
    RpcConditionVariable joinCv;
    RpcMutexUniqueLock lock(mutex);
    RpcMaybeThread thread;
    sp<RpcSession> thiz = sp<RpcSession>::fromExisting(this);
    bool ownershipTransferred = false;
    thread = std::thread([&]() {
        std::unique_lock<std::mutex> threadLock(mutex);
    thread = RpcMaybeThread([&]() {
        RpcMutexUniqueLock threadLock(mutex);
        std::unique_ptr<RpcTransport> movedRpcTransport = std::move(rpcTransport);
        // NOLINTNEXTLINE(performance-unnecessary-copy-initialization)
        sp<RpcSession> session = thiz;
@@ -678,6 +682,7 @@ status_t RpcSession::addIncomingConnection(std::unique_ptr<RpcTransport> rpcTran

        RpcSession::join(std::move(session), std::move(setupResult));
    });
    rpcJoinIfSingleThreaded(thread);
    joinCv.wait(lock, [&] { return ownershipTransferred; });
    LOG_ALWAYS_FATAL_IF(!ownershipTransferred);
    return OK;
@@ -697,9 +702,9 @@ status_t RpcSession::initShutdownTrigger() {
status_t RpcSession::addOutgoingConnection(std::unique_ptr<RpcTransport> rpcTransport, bool init) {
    sp<RpcConnection> connection = sp<RpcConnection>::make();
    {
        std::lock_guard<std::mutex> _l(mMutex);
        RpcMutexLockGuard _l(mMutex);
        connection->rpcTransport = std::move(rpcTransport);
        connection->exclusiveTid = base::GetThreadId();
        connection->exclusiveTid = rpcGetThreadId();
        mConnections.mOutgoing.push_back(connection);
    }

@@ -736,7 +741,7 @@ bool RpcSession::setForServer(const wp<RpcServer>& server, const wp<EventListene

sp<RpcSession::RpcConnection> RpcSession::assignIncomingConnectionToThisThread(
        std::unique_ptr<RpcTransport> rpcTransport) {
    std::lock_guard<std::mutex> _l(mMutex);
    RpcMutexLockGuard _l(mMutex);

    if (mConnections.mIncoming.size() >= mMaxIncomingThreads) {
        ALOGE("Cannot add thread to session with %zu threads (max is set to %zu)",
@@ -754,7 +759,7 @@ sp<RpcSession::RpcConnection> RpcSession::assignIncomingConnectionToThisThread(

    sp<RpcConnection> session = sp<RpcConnection>::make();
    session->rpcTransport = std::move(rpcTransport);
    session->exclusiveTid = base::GetThreadId();
    session->exclusiveTid = rpcGetThreadId();

    mConnections.mIncoming.push_back(session);
    mConnections.mMaxIncoming = mConnections.mIncoming.size();
@@ -763,7 +768,7 @@ sp<RpcSession::RpcConnection> RpcSession::assignIncomingConnectionToThisThread(
}

bool RpcSession::removeIncomingConnection(const sp<RpcConnection>& connection) {
    std::unique_lock<std::mutex> _l(mMutex);
    RpcMutexUniqueLock _l(mMutex);
    if (auto it =
                std::find(mConnections.mIncoming.begin(), mConnections.mIncoming.end(), connection);
        it != mConnections.mIncoming.end()) {
@@ -781,7 +786,7 @@ bool RpcSession::removeIncomingConnection(const sp<RpcConnection>& connection) {
}

void RpcSession::clearConnectionTid(const sp<RpcConnection>& connection) {
    std::unique_lock<std::mutex> _l(mMutex);
    RpcMutexUniqueLock _l(mMutex);
    connection->exclusiveTid = std::nullopt;
    if (mConnections.mWaitingThreads > 0) {
        _l.unlock();
@@ -799,8 +804,8 @@ status_t RpcSession::ExclusiveConnection::find(const sp<RpcSession>& session, Co
    connection->mConnection = nullptr;
    connection->mReentrant = false;

    uint64_t tid = base::GetThreadId();
    std::unique_lock<std::mutex> _l(session->mMutex);
    uint64_t tid = rpcGetThreadId();
    RpcMutexUniqueLock _l(session->mMutex);

    session->mConnections.mWaitingThreads++;
    while (true) {
Loading