Donate to e Foundation | Murena handsets with /e/OS | Own a part of Murena! Learn more

Commit 75047ca4 authored by Steven Moreland's avatar Steven Moreland
Browse files

RPC Binder: dropping all binders drops session

Previously, RPC binder was prone to leaks when you have
an incoming threadpool. Even if you dropped all references
to all binders, you still needed to manually call
shutdownAndWait.

Now, when you drop the last binder for a connection (after
you call getRootBinder the first time), it will automatically
shutdown the connection.

Previously, when this situation would happen, you could allow
a session to call getRootBinder again. However, we don't expect
or need this, so we can change the behavior.

Bug: 271830568
Test: binderRpcTest (+= 5s)
Change-Id: I2033f0f2908e238c0b96060489f72780b51a4bbe
parent 69e22e9e
Loading
Loading
Loading
Loading
+31 −7
Original line number Diff line number Diff line
@@ -262,8 +262,10 @@ void RpcState::dump() {
}

void RpcState::clear() {
    RpcMutexUniqueLock _l(mNodeMutex);
    return clear(RpcMutexUniqueLock(mNodeMutex));
}

void RpcState::clear(RpcMutexUniqueLock nodeLock) {
    if (mTerminated) {
        LOG_ALWAYS_FATAL_IF(!mNodeForAddress.empty(),
                            "New state should be impossible after terminating!");
@@ -292,7 +294,7 @@ void RpcState::clear() {
    auto temp = std::move(mNodeForAddress);
    mNodeForAddress.clear(); // RpcState isn't reusable, but for future/explicit

    _l.unlock();
    nodeLock.unlock();
    temp.clear(); // explicit
}

@@ -704,7 +706,7 @@ status_t RpcState::sendDecStrongToTarget(const sp<RpcSession::RpcConnection>& co
    };

    {
        RpcMutexLockGuard _l(mNodeMutex);
        RpcMutexUniqueLock _l(mNodeMutex);
        if (mTerminated) return DEAD_OBJECT; // avoid fatal only, otherwise races
        auto it = mNodeForAddress.find(addr);
        LOG_ALWAYS_FATAL_IF(it == mNodeForAddress.end(),
@@ -720,8 +722,9 @@ status_t RpcState::sendDecStrongToTarget(const sp<RpcSession::RpcConnection>& co
        body.amount = it->second.timesRecd - target;
        it->second.timesRecd = target;

        LOG_ALWAYS_FATAL_IF(nullptr != tryEraseNode(it),
        LOG_ALWAYS_FATAL_IF(nullptr != tryEraseNode(session, std::move(_l), it),
                            "Bad state. RpcState shouldn't own received binder");
        // LOCK ALREADY RELEASED
    }

    RpcWireHeader cmd = {
@@ -1164,8 +1167,8 @@ status_t RpcState::processDecStrong(const sp<RpcSession::RpcConnection>& connect
                   it->second.timesSent);

    it->second.timesSent -= body.amount;
    sp<IBinder> tempHold = tryEraseNode(it);
    _l.unlock();
    sp<IBinder> tempHold = tryEraseNode(session, std::move(_l), it);
    // LOCK ALREADY RELEASED
    tempHold = nullptr; // destructor may make binder calls on this session

    return OK;
@@ -1229,7 +1232,10 @@ status_t RpcState::validateParcel(const sp<RpcSession>& session, const Parcel& p
    return OK;
}

sp<IBinder> RpcState::tryEraseNode(std::map<uint64_t, BinderNode>::iterator& it) {
sp<IBinder> RpcState::tryEraseNode(const sp<RpcSession>& session, RpcMutexUniqueLock nodeLock,
                                   std::map<uint64_t, BinderNode>::iterator& it) {
    bool shouldShutdown = false;

    sp<IBinder> ref;

    if (it->second.timesSent == 0) {
@@ -1239,8 +1245,26 @@ sp<IBinder> RpcState::tryEraseNode(std::map<uint64_t, BinderNode>::iterator& it)
            LOG_ALWAYS_FATAL_IF(!it->second.asyncTodo.empty(),
                                "Can't delete binder w/ pending async transactions");
            mNodeForAddress.erase(it);

            if (mNodeForAddress.size() == 0) {
                shouldShutdown = true;
            }
        }
    }

    // If we shutdown, prevent RpcState from being re-used. This prevents another
    // thread from getting the root object again.
    if (shouldShutdown) {
        clear(std::move(nodeLock));
    } else {
        nodeLock.unlock(); // explicit
    }
    // LOCK IS RELEASED

    if (shouldShutdown) {
        ALOGI("RpcState has no binders left, so triggering shutdown...");
        (void)session->shutdownAndWait(false);
    }

    return ref;
}
+15 −5
Original line number Diff line number Diff line
@@ -168,6 +168,7 @@ public:
    void clear();

private:
    void clear(RpcMutexUniqueLock nodeLock);
    void dumpLocked();

    // Alternative to std::vector<uint8_t> that doesn't abort on allocation failure and caps
@@ -268,11 +269,20 @@ private:
        std::string toString() const;
    };

    // checks if there is any reference left to a node and erases it. If erase
    // happens, and there is a strong reference to the binder kept by
    // binderNode, this returns that strong reference, so that it can be
    // dropped after any locks are removed.
    sp<IBinder> tryEraseNode(std::map<uint64_t, BinderNode>::iterator& it);
    // Checks if there is any reference left to a node and erases it. If this
    // is the last node, shuts down the session.
    //
    // Node lock is passed here for convenience, so that we can release it
    // and terminate the session, but we could leave it up to the caller
    // by returning a continuation if we needed to erase multiple specific
    // nodes. It may be tempting to allow the client to keep on holding the
    // lock and instead just return whether or not we should shutdown, but
    // this introduces the posssibility that another thread calls
    // getRootBinder and thinks it is valid, rather than immediately getting
    // an error.
    sp<IBinder> tryEraseNode(const sp<RpcSession>& session, RpcMutexUniqueLock nodeLock,
                             std::map<uint64_t, BinderNode>::iterator& it);

    // true - success
    // false - session shutdown, halt
    [[nodiscard]] bool nodeProgressAsyncNumber(BinderNode* node);
+3 −0
Original line number Diff line number Diff line
@@ -51,6 +51,9 @@ constexpr uint32_t RPC_WIRE_PROTOCOL_VERSION_RPC_HEADER_FEATURE_EXPLICIT_PARCEL_
 * This represents a session (group of connections) between a client
 * and a server. Multiple connections are needed for multiple parallel "binder"
 * calls which may also have nested calls.
 *
 * Once a binder exists in the session, if all references to all binders are dropped,
 * the session shuts down.
 */
class RpcSession final : public virtual RefBase {
public:
+6 −2
Original line number Diff line number Diff line
@@ -102,9 +102,11 @@ std::unique_ptr<RpcTransportCtxFactory> makeFactoryTls() {
}

static sp<RpcSession> gSession = RpcSession::make();
static sp<IBinder> gRpcBinder;
// Certificate validation happens during handshake and does not affect the result of benchmarks.
// Skip certificate validation to simplify the setup process.
static sp<RpcSession> gSessionTls = RpcSession::make(makeFactoryTls());
static sp<IBinder> gRpcTlsBinder;
#ifdef __BIONIC__
static const String16 kKernelBinderInstance = String16(u"binderRpcBenchmark-control");
static sp<IBinder> gKernelBinder;
@@ -118,9 +120,9 @@ static sp<IBinder> getBinderForOptions(benchmark::State& state) {
            return gKernelBinder;
#endif
        case RPC:
            return gSession->getRootObject();
            return gRpcBinder;
        case RPC_TLS:
            return gSessionTls->getRootObject();
            return gRpcTlsBinder;
        default:
            LOG(FATAL) << "Unknown transport value: " << transport;
            return nullptr;
@@ -254,11 +256,13 @@ int main(int argc, char** argv) {
    (void)unlink(addr.c_str());
    forkRpcServer(addr.c_str(), RpcServer::make(RpcTransportCtxFactoryRaw::make()));
    setupClient(gSession, addr.c_str());
    gRpcBinder = gSession->getRootObject();

    std::string tlsAddr = tmp + "/binderRpcTlsBenchmark";
    (void)unlink(tlsAddr.c_str());
    forkRpcServer(tlsAddr.c_str(), RpcServer::make(makeFactoryTls()));
    setupClient(gSessionTls, tlsAddr.c_str());
    gRpcTlsBinder = gSessionTls->getRootObject();

    ::benchmark::RunSpecifiedBenchmarks();
    return 0;
+50 −15
Original line number Diff line number Diff line
@@ -163,7 +163,8 @@ public:
            session.root = nullptr;
        }

        for (auto& info : sessions) {
        for (size_t sessionNum = 0; sessionNum < sessions.size(); sessionNum++) {
            auto& info = sessions.at(sessionNum);
            sp<RpcSession>& session = info.session;

            EXPECT_NE(nullptr, session);
@@ -179,6 +180,7 @@ public:
            for (size_t i = 0; i < 3; i++) {
                sp<RpcSession> strongSession = weakSession.promote();
                EXPECT_EQ(nullptr, strongSession)
                        << "For session " << sessionNum << ". "
                        << (debugBacktrace(host.getPid()), debugBacktrace(getpid()),
                            "Leaked sess: ")
                        << strongSession->getStrongCount() << " checked time " << i;
@@ -254,6 +256,10 @@ std::unique_ptr<ProcessSession> BinderRpc::createRpcTestSocketServerProcessEtc(
        const BinderRpcOptions& options) {
    CHECK_GE(options.numSessions, 1) << "Must have at least one session to a server";

    if (options.numIncomingConnectionsBySession.size() != 0) {
        CHECK_EQ(options.numIncomingConnectionsBySession.size(), options.numSessions);
    }

    SocketType socketType = std::get<0>(GetParam());
    RpcSecurity rpcSecurity = std::get<1>(GetParam());
    uint32_t clientVersion = std::get<2>(GetParam());
@@ -351,9 +357,15 @@ std::unique_ptr<ProcessSession> BinderRpc::createRpcTestSocketServerProcessEtc(

    status_t status;

    for (const auto& session : sessions) {
    for (size_t i = 0; i < sessions.size(); i++) {
        const auto& session = sessions.at(i);

        size_t numIncoming = options.numIncomingConnectionsBySession.size() > 0
                ? options.numIncomingConnectionsBySession.at(i)
                : 0;

        CHECK(session->setProtocolVersion(clientVersion));
        session->setMaxIncomingThreads(options.numIncomingConnections);
        session->setMaxIncomingThreads(numIncoming);
        session->setMaxOutgoingConnections(options.numOutgoingConnections);
        session->setFileDescriptorTransportMode(options.clientFileDescriptorTransportMode);

@@ -659,6 +671,32 @@ TEST_P(BinderRpc, OnewayCallExhaustion) {
    proc.proc->sessions.erase(proc.proc->sessions.begin() + 1);
}

TEST_P(BinderRpc, SessionWithIncomingThreadpoolDoesntLeak) {
    if (clientOrServerSingleThreaded()) {
        GTEST_SKIP() << "This test requires multiple threads";
    }

    // session 0 - will check for leaks in destrutor of proc
    // session 1 - we want to make sure it gets deleted when we drop all references to it
    auto proc = createRpcTestSocketServerProcess(
            {.numThreads = 1, .numIncomingConnectionsBySession = {0, 1}, .numSessions = 2});

    wp<RpcSession> session = proc.proc->sessions.at(1).session;

    // remove all references to the second session
    proc.proc->sessions.at(1).root = nullptr;
    proc.proc->sessions.erase(proc.proc->sessions.begin() + 1);

    // TODO(b/271830568) more efficient way to wait for other incoming threadpool
    // to drain commands.
    for (size_t i = 0; i < 100; i++) {
        usleep(10 * 1000);
        if (session.promote() == nullptr) break;
    }

    EXPECT_EQ(nullptr, session.promote());
}

TEST_P(BinderRpc, SingleDeathRecipient) {
    if (clientOrServerSingleThreaded()) {
        GTEST_SKIP() << "This test requires multiple threads";
@@ -676,7 +714,7 @@ TEST_P(BinderRpc, SingleDeathRecipient) {

    // Death recipient needs to have an incoming connection to be called
    auto proc = createRpcTestSocketServerProcess(
            {.numThreads = 1, .numSessions = 1, .numIncomingConnections = 1});
            {.numThreads = 1, .numSessions = 1, .numIncomingConnectionsBySession = {1}});

    auto dr = sp<MyDeathRec>::make();
    ASSERT_EQ(OK, proc.rootBinder->linkToDeath(dr, (void*)1, 0));
@@ -689,6 +727,10 @@ TEST_P(BinderRpc, SingleDeathRecipient) {
    ASSERT_TRUE(dr->mCv.wait_for(lock, 100ms, [&]() { return dr->dead; }));

    // need to wait for the session to shutdown so we don't "Leak session"
    // can't do this before checking the death recipient by calling
    // forceShutdown earlier, because shutdownAndWait will also trigger
    // a death recipient, but if we had a way to wait for the service
    // to gracefully shutdown, we could use that here.
    EXPECT_TRUE(proc.proc->sessions.at(0).session->shutdownAndWait(true));
    proc.expectAlreadyShutdown = true;
}
@@ -710,7 +752,7 @@ TEST_P(BinderRpc, SingleDeathRecipientOnShutdown) {

    // Death recipient needs to have an incoming connection to be called
    auto proc = createRpcTestSocketServerProcess(
            {.numThreads = 1, .numSessions = 1, .numIncomingConnections = 1});
            {.numThreads = 1, .numSessions = 1, .numIncomingConnectionsBySession = {1}});

    auto dr = sp<MyDeathRec>::make();
    EXPECT_EQ(OK, proc.rootBinder->linkToDeath(dr, (void*)1, 0));
@@ -743,8 +785,7 @@ TEST_P(BinderRpc, DeathRecipientFailsWithoutIncoming) {
        void binderDied(const wp<IBinder>& /* who */) override {}
    };

    auto proc = createRpcTestSocketServerProcess(
            {.numThreads = 1, .numSessions = 1, .numIncomingConnections = 0});
    auto proc = createRpcTestSocketServerProcess({.numThreads = 1, .numSessions = 1});

    auto dr = sp<MyDeathRec>::make();
    EXPECT_EQ(INVALID_OPERATION, proc.rootBinder->linkToDeath(dr, (void*)1, 0));
@@ -763,19 +804,13 @@ TEST_P(BinderRpc, UnlinkDeathRecipient) {

    // Death recipient needs to have an incoming connection to be called
    auto proc = createRpcTestSocketServerProcess(
            {.numThreads = 1, .numSessions = 1, .numIncomingConnections = 1});
            {.numThreads = 1, .numSessions = 1, .numIncomingConnectionsBySession = {1}});

    auto dr = sp<MyDeathRec>::make();
    ASSERT_EQ(OK, proc.rootBinder->linkToDeath(dr, (void*)1, 0));
    ASSERT_EQ(OK, proc.rootBinder->unlinkToDeath(dr, (void*)1, 0, nullptr));

    if (auto status = proc.rootIface->scheduleShutdown(); !status.isOk()) {
        EXPECT_EQ(DEAD_OBJECT, status.transactionError()) << status;
    }

    // need to wait for the session to shutdown so we don't "Leak session"
    EXPECT_TRUE(proc.proc->sessions.at(0).session->shutdownAndWait(true));
    proc.expectAlreadyShutdown = true;
    proc.forceShutdown();
}

TEST_P(BinderRpc, Die) {
Loading