Donate to e Foundation | Murena handsets with /e/OS | Own a part of Murena! Learn more

Commit 9b2d71cd authored by Kwangwu Lee's avatar Kwangwu Lee Committed by Automerger Merge Worker
Browse files

Merge "Revert "RPC Binder: dropping all binders drops session"" am: ca5c2fdc...

Merge "Revert "RPC Binder: dropping all binders drops session"" am: ca5c2fdc am: 881c907e am: a1173c5a

Original change: https://android-review.googlesource.com/c/platform/frameworks/native/+/2488395



Change-Id: Ic64a5fbf33c05698a856b1b0c342b934dec63be6
Signed-off-by: default avatarAutomerger Merge Worker <android-build-automerger-merge-worker@system.gserviceaccount.com>
parents 612b7822 a1173c5a
Loading
Loading
Loading
Loading
+7 −31
Original line number Diff line number Diff line
@@ -262,10 +262,8 @@ void RpcState::dump() {
}

void RpcState::clear() {
    return clear(RpcMutexUniqueLock(mNodeMutex));
}
    RpcMutexUniqueLock _l(mNodeMutex);

void RpcState::clear(RpcMutexUniqueLock nodeLock) {
    if (mTerminated) {
        LOG_ALWAYS_FATAL_IF(!mNodeForAddress.empty(),
                            "New state should be impossible after terminating!");
@@ -294,7 +292,7 @@ void RpcState::clear(RpcMutexUniqueLock nodeLock) {
    auto temp = std::move(mNodeForAddress);
    mNodeForAddress.clear(); // RpcState isn't reusable, but for future/explicit

    nodeLock.unlock();
    _l.unlock();
    temp.clear(); // explicit
}

@@ -706,7 +704,7 @@ status_t RpcState::sendDecStrongToTarget(const sp<RpcSession::RpcConnection>& co
    };

    {
        RpcMutexUniqueLock _l(mNodeMutex);
        RpcMutexLockGuard _l(mNodeMutex);
        if (mTerminated) return DEAD_OBJECT; // avoid fatal only, otherwise races
        auto it = mNodeForAddress.find(addr);
        LOG_ALWAYS_FATAL_IF(it == mNodeForAddress.end(),
@@ -722,9 +720,8 @@ status_t RpcState::sendDecStrongToTarget(const sp<RpcSession::RpcConnection>& co
        body.amount = it->second.timesRecd - target;
        it->second.timesRecd = target;

        LOG_ALWAYS_FATAL_IF(nullptr != tryEraseNode(session, std::move(_l), it),
        LOG_ALWAYS_FATAL_IF(nullptr != tryEraseNode(it),
                            "Bad state. RpcState shouldn't own received binder");
        // LOCK ALREADY RELEASED
    }

    RpcWireHeader cmd = {
@@ -1167,8 +1164,8 @@ status_t RpcState::processDecStrong(const sp<RpcSession::RpcConnection>& connect
                   it->second.timesSent);

    it->second.timesSent -= body.amount;
    sp<IBinder> tempHold = tryEraseNode(session, std::move(_l), it);
    // LOCK ALREADY RELEASED
    sp<IBinder> tempHold = tryEraseNode(it);
    _l.unlock();
    tempHold = nullptr; // destructor may make binder calls on this session

    return OK;
@@ -1232,10 +1229,7 @@ status_t RpcState::validateParcel(const sp<RpcSession>& session, const Parcel& p
    return OK;
}

sp<IBinder> RpcState::tryEraseNode(const sp<RpcSession>& session, RpcMutexUniqueLock nodeLock,
                                   std::map<uint64_t, BinderNode>::iterator& it) {
    bool shouldShutdown = false;

sp<IBinder> RpcState::tryEraseNode(std::map<uint64_t, BinderNode>::iterator& it) {
    sp<IBinder> ref;

    if (it->second.timesSent == 0) {
@@ -1245,26 +1239,8 @@ sp<IBinder> RpcState::tryEraseNode(const sp<RpcSession>& session, RpcMutexUnique
            LOG_ALWAYS_FATAL_IF(!it->second.asyncTodo.empty(),
                                "Can't delete binder w/ pending async transactions");
            mNodeForAddress.erase(it);

            if (mNodeForAddress.size() == 0) {
                shouldShutdown = true;
        }
    }
    }

    // If we shutdown, prevent RpcState from being re-used. This prevents another
    // thread from getting the root object again.
    if (shouldShutdown) {
        clear(std::move(nodeLock));
    } else {
        nodeLock.unlock(); // explicit
    }
    // LOCK IS RELEASED

    if (shouldShutdown) {
        ALOGI("RpcState has no binders left, so triggering shutdown...");
        (void)session->shutdownAndWait(false);
    }

    return ref;
}
+5 −15
Original line number Diff line number Diff line
@@ -168,7 +168,6 @@ public:
    void clear();

private:
    void clear(RpcMutexUniqueLock nodeLock);
    void dumpLocked();

    // Alternative to std::vector<uint8_t> that doesn't abort on allocation failure and caps
@@ -269,20 +268,11 @@ private:
        std::string toString() const;
    };

    // Checks if there is any reference left to a node and erases it. If this
    // is the last node, shuts down the session.
    //
    // Node lock is passed here for convenience, so that we can release it
    // and terminate the session, but we could leave it up to the caller
    // by returning a continuation if we needed to erase multiple specific
    // nodes. It may be tempting to allow the client to keep on holding the
    // lock and instead just return whether or not we should shutdown, but
    // this introduces the posssibility that another thread calls
    // getRootBinder and thinks it is valid, rather than immediately getting
    // an error.
    sp<IBinder> tryEraseNode(const sp<RpcSession>& session, RpcMutexUniqueLock nodeLock,
                             std::map<uint64_t, BinderNode>::iterator& it);

    // checks if there is any reference left to a node and erases it. If erase
    // happens, and there is a strong reference to the binder kept by
    // binderNode, this returns that strong reference, so that it can be
    // dropped after any locks are removed.
    sp<IBinder> tryEraseNode(std::map<uint64_t, BinderNode>::iterator& it);
    // true - success
    // false - session shutdown, halt
    [[nodiscard]] bool nodeProgressAsyncNumber(BinderNode* node);
+0 −3
Original line number Diff line number Diff line
@@ -51,9 +51,6 @@ constexpr uint32_t RPC_WIRE_PROTOCOL_VERSION_RPC_HEADER_FEATURE_EXPLICIT_PARCEL_
 * This represents a session (group of connections) between a client
 * and a server. Multiple connections are needed for multiple parallel "binder"
 * calls which may also have nested calls.
 *
 * Once a binder exists in the session, if all references to all binders are dropped,
 * the session shuts down.
 */
class RpcSession final : public virtual RefBase {
public:
+2 −6
Original line number Diff line number Diff line
@@ -102,11 +102,9 @@ std::unique_ptr<RpcTransportCtxFactory> makeFactoryTls() {
}

static sp<RpcSession> gSession = RpcSession::make();
static sp<IBinder> gRpcBinder;
// Certificate validation happens during handshake and does not affect the result of benchmarks.
// Skip certificate validation to simplify the setup process.
static sp<RpcSession> gSessionTls = RpcSession::make(makeFactoryTls());
static sp<IBinder> gRpcTlsBinder;
#ifdef __BIONIC__
static const String16 kKernelBinderInstance = String16(u"binderRpcBenchmark-control");
static sp<IBinder> gKernelBinder;
@@ -120,9 +118,9 @@ static sp<IBinder> getBinderForOptions(benchmark::State& state) {
            return gKernelBinder;
#endif
        case RPC:
            return gRpcBinder;
            return gSession->getRootObject();
        case RPC_TLS:
            return gRpcTlsBinder;
            return gSessionTls->getRootObject();
        default:
            LOG(FATAL) << "Unknown transport value: " << transport;
            return nullptr;
@@ -256,13 +254,11 @@ int main(int argc, char** argv) {
    (void)unlink(addr.c_str());
    forkRpcServer(addr.c_str(), RpcServer::make(RpcTransportCtxFactoryRaw::make()));
    setupClient(gSession, addr.c_str());
    gRpcBinder = gSession->getRootObject();

    std::string tlsAddr = tmp + "/binderRpcTlsBenchmark";
    (void)unlink(tlsAddr.c_str());
    forkRpcServer(tlsAddr.c_str(), RpcServer::make(makeFactoryTls()));
    setupClient(gSessionTls, tlsAddr.c_str());
    gRpcTlsBinder = gSessionTls->getRootObject();

    ::benchmark::RunSpecifiedBenchmarks();
    return 0;
+15 −50
Original line number Diff line number Diff line
@@ -163,8 +163,7 @@ public:
            session.root = nullptr;
        }

        for (size_t sessionNum = 0; sessionNum < sessions.size(); sessionNum++) {
            auto& info = sessions.at(sessionNum);
        for (auto& info : sessions) {
            sp<RpcSession>& session = info.session;

            EXPECT_NE(nullptr, session);
@@ -180,7 +179,6 @@ public:
            for (size_t i = 0; i < 3; i++) {
                sp<RpcSession> strongSession = weakSession.promote();
                EXPECT_EQ(nullptr, strongSession)
                        << "For session " << sessionNum << ". "
                        << (debugBacktrace(host.getPid()), debugBacktrace(getpid()),
                            "Leaked sess: ")
                        << strongSession->getStrongCount() << " checked time " << i;
@@ -256,10 +254,6 @@ std::unique_ptr<ProcessSession> BinderRpc::createRpcTestSocketServerProcessEtc(
        const BinderRpcOptions& options) {
    CHECK_GE(options.numSessions, 1) << "Must have at least one session to a server";

    if (options.numIncomingConnectionsBySession.size() != 0) {
        CHECK_EQ(options.numIncomingConnectionsBySession.size(), options.numSessions);
    }

    SocketType socketType = std::get<0>(GetParam());
    RpcSecurity rpcSecurity = std::get<1>(GetParam());
    uint32_t clientVersion = std::get<2>(GetParam());
@@ -357,15 +351,9 @@ std::unique_ptr<ProcessSession> BinderRpc::createRpcTestSocketServerProcessEtc(

    status_t status;

    for (size_t i = 0; i < sessions.size(); i++) {
        const auto& session = sessions.at(i);

        size_t numIncoming = options.numIncomingConnectionsBySession.size() > 0
                ? options.numIncomingConnectionsBySession.at(i)
                : 0;

    for (const auto& session : sessions) {
        CHECK(session->setProtocolVersion(clientVersion));
        session->setMaxIncomingThreads(numIncoming);
        session->setMaxIncomingThreads(options.numIncomingConnections);
        session->setMaxOutgoingConnections(options.numOutgoingConnections);
        session->setFileDescriptorTransportMode(options.clientFileDescriptorTransportMode);

@@ -663,32 +651,6 @@ TEST_P(BinderRpc, OnewayCallExhaustion) {
    proc.proc->sessions.erase(proc.proc->sessions.begin() + 1);
}

TEST_P(BinderRpc, SessionWithIncomingThreadpoolDoesntLeak) {
    if (clientOrServerSingleThreaded()) {
        GTEST_SKIP() << "This test requires multiple threads";
    }

    // session 0 - will check for leaks in destrutor of proc
    // session 1 - we want to make sure it gets deleted when we drop all references to it
    auto proc = createRpcTestSocketServerProcess(
            {.numThreads = 1, .numIncomingConnectionsBySession = {0, 1}, .numSessions = 2});

    wp<RpcSession> session = proc.proc->sessions.at(1).session;

    // remove all references to the second session
    proc.proc->sessions.at(1).root = nullptr;
    proc.proc->sessions.erase(proc.proc->sessions.begin() + 1);

    // TODO(b/271830568) more efficient way to wait for other incoming threadpool
    // to drain commands.
    for (size_t i = 0; i < 100; i++) {
        usleep(10 * 1000);
        if (session.promote() == nullptr) break;
    }

    EXPECT_EQ(nullptr, session.promote());
}

TEST_P(BinderRpc, SingleDeathRecipient) {
    if (clientOrServerSingleThreaded()) {
        GTEST_SKIP() << "This test requires multiple threads";
@@ -706,7 +668,7 @@ TEST_P(BinderRpc, SingleDeathRecipient) {

    // Death recipient needs to have an incoming connection to be called
    auto proc = createRpcTestSocketServerProcess(
            {.numThreads = 1, .numSessions = 1, .numIncomingConnectionsBySession = {1}});
            {.numThreads = 1, .numSessions = 1, .numIncomingConnections = 1});

    auto dr = sp<MyDeathRec>::make();
    ASSERT_EQ(OK, proc.rootBinder->linkToDeath(dr, (void*)1, 0));
@@ -719,10 +681,6 @@ TEST_P(BinderRpc, SingleDeathRecipient) {
    ASSERT_TRUE(dr->mCv.wait_for(lock, 100ms, [&]() { return dr->dead; }));

    // need to wait for the session to shutdown so we don't "Leak session"
    // can't do this before checking the death recipient by calling
    // forceShutdown earlier, because shutdownAndWait will also trigger
    // a death recipient, but if we had a way to wait for the service
    // to gracefully shutdown, we could use that here.
    EXPECT_TRUE(proc.proc->sessions.at(0).session->shutdownAndWait(true));
    proc.expectAlreadyShutdown = true;
}
@@ -744,7 +702,7 @@ TEST_P(BinderRpc, SingleDeathRecipientOnShutdown) {

    // Death recipient needs to have an incoming connection to be called
    auto proc = createRpcTestSocketServerProcess(
            {.numThreads = 1, .numSessions = 1, .numIncomingConnectionsBySession = {1}});
            {.numThreads = 1, .numSessions = 1, .numIncomingConnections = 1});

    auto dr = sp<MyDeathRec>::make();
    EXPECT_EQ(OK, proc.rootBinder->linkToDeath(dr, (void*)1, 0));
@@ -777,7 +735,8 @@ TEST_P(BinderRpc, DeathRecipientFailsWithoutIncoming) {
        void binderDied(const wp<IBinder>& /* who */) override {}
    };

    auto proc = createRpcTestSocketServerProcess({.numThreads = 1, .numSessions = 1});
    auto proc = createRpcTestSocketServerProcess(
            {.numThreads = 1, .numSessions = 1, .numIncomingConnections = 0});

    auto dr = sp<MyDeathRec>::make();
    EXPECT_EQ(INVALID_OPERATION, proc.rootBinder->linkToDeath(dr, (void*)1, 0));
@@ -796,13 +755,19 @@ TEST_P(BinderRpc, UnlinkDeathRecipient) {

    // Death recipient needs to have an incoming connection to be called
    auto proc = createRpcTestSocketServerProcess(
            {.numThreads = 1, .numSessions = 1, .numIncomingConnectionsBySession = {1}});
            {.numThreads = 1, .numSessions = 1, .numIncomingConnections = 1});

    auto dr = sp<MyDeathRec>::make();
    ASSERT_EQ(OK, proc.rootBinder->linkToDeath(dr, (void*)1, 0));
    ASSERT_EQ(OK, proc.rootBinder->unlinkToDeath(dr, (void*)1, 0, nullptr));

    proc.forceShutdown();
    if (auto status = proc.rootIface->scheduleShutdown(); !status.isOk()) {
        EXPECT_EQ(DEAD_OBJECT, status.transactionError()) << status;
    }

    // need to wait for the session to shutdown so we don't "Leak session"
    EXPECT_TRUE(proc.proc->sessions.at(0).session->shutdownAndWait(true));
    proc.expectAlreadyShutdown = true;
}

TEST_P(BinderRpc, Die) {
Loading