Donate to e Foundation | Murena handsets with /e/OS | Own a part of Murena! Learn more

Commit ee3f4669 authored by Steven Moreland's avatar Steven Moreland
Browse files

libbinder: shutdown session threads

The last piece to completely shutting down servers (this is in
preparation for adding threadpools to server callbacks, which actually
need to be shut down during normal usage).

Bug: 185167543
Test: binderRpcTest

Change-Id: I20d6ac16c58fe6801545fa7be178518201fe075d
parent 2b4f380c
Loading
Loading
Loading
Loading
+10 −4
Original line number Diff line number Diff line
@@ -192,10 +192,10 @@ bool RpcServer::shutdown() {
    }

    mShutdownTrigger->trigger();
    while (mJoinThreadRunning || !mConnectingThreads.empty()) {
    while (mJoinThreadRunning || !mConnectingThreads.empty() || !mSessions.empty()) {
        ALOGI("Waiting for RpcServer to shut down. Join thread running: %d, Connecting threads: "
              "%zu",
              mJoinThreadRunning, mConnectingThreads.size());
              "%zu, Sessions: %zu",
              mJoinThreadRunning, mConnectingThreads.size(), mSessions.size());
        mShutdownCv.wait(_l);
    }

@@ -278,7 +278,8 @@ void RpcServer::establishConnection(sp<RpcServer>&& server, base::unique_fd clie
            server->mSessionIdCounter++;

            session = RpcSession::make();
            session->setForServer(wp<RpcServer>(server), server->mSessionIdCounter);
            session->setForServer(wp<RpcServer>(server), server->mSessionIdCounter,
                                  server->mShutdownTrigger);

            server->mSessions[server->mSessionIdCounter] = session;
        } else {
@@ -344,6 +345,11 @@ void RpcServer::onSessionTerminating(const sp<RpcSession>& session) {
    (void)mSessions.erase(it);
}

void RpcServer::onSessionThreadEnding(const sp<RpcSession>& session) {
    (void)session;
    mShutdownCv.notify_all();
}

bool RpcServer::hasServer() {
    LOG_ALWAYS_FATAL_IF(!mAgreedExperimental, "no!");
    std::lock_guard<std::mutex> _l(mLock);
+19 −1
Original line number Diff line number Diff line
@@ -207,12 +207,19 @@ void RpcSession::join(unique_fd client) {
    LOG_ALWAYS_FATAL_IF(!removeServerConnection(connection),
                        "bad state: connection object guaranteed to be in list");

    sp<RpcServer> server;
    {
        std::lock_guard<std::mutex> _l(mMutex);
        auto it = mThreads.find(std::this_thread::get_id());
        LOG_ALWAYS_FATAL_IF(it == mThreads.end());
        it->second.detach();
        mThreads.erase(it);

        server = mForServer.promote();
    }

    if (server != nullptr) {
        server->onSessionThreadEnding(sp<RpcSession>::fromExisting(this));
    }
}

@@ -314,14 +321,25 @@ bool RpcSession::setupOneSocketClient(const RpcSocketAddress& addr, int32_t id)

void RpcSession::addClientConnection(unique_fd fd) {
    std::lock_guard<std::mutex> _l(mMutex);

    if (mShutdownTrigger == nullptr) {
        mShutdownTrigger = FdTrigger::make();
    }

    sp<RpcConnection> session = sp<RpcConnection>::make();
    session->fd = std::move(fd);
    mClientConnections.push_back(session);
}

void RpcSession::setForServer(const wp<RpcServer>& server, int32_t sessionId) {
void RpcSession::setForServer(const wp<RpcServer>& server, int32_t sessionId,
                              const std::shared_ptr<FdTrigger>& shutdownTrigger) {
    LOG_ALWAYS_FATAL_IF(mForServer.unsafe_get() != nullptr);
    LOG_ALWAYS_FATAL_IF(mShutdownTrigger != nullptr);
    LOG_ALWAYS_FATAL_IF(shutdownTrigger == nullptr);

    mId = sessionId;
    mForServer = server;
    mShutdownTrigger = shutdownTrigger;
}

sp<RpcSession::RpcConnection> RpcSession::assignServerToThisThread(unique_fd fd) {
+16 −23
Original line number Diff line number Diff line
@@ -229,30 +229,22 @@ bool RpcState::rpcSend(const base::unique_fd& fd, const char* what, const void*
    return true;
}

bool RpcState::rpcRec(const base::unique_fd& fd, const char* what, void* data, size_t size) {
bool RpcState::rpcRec(const base::unique_fd& fd, const sp<RpcSession>& session, const char* what,
                      void* data, size_t size) {
    if (size > std::numeric_limits<ssize_t>::max()) {
        ALOGE("Cannot rec %s at size %zu (too big)", what, size);
        terminate();
        return false;
    }

    ssize_t recd = TEMP_FAILURE_RETRY(recv(fd.get(), data, size, MSG_WAITALL | MSG_NOSIGNAL));

    if (recd < 0 || recd != static_cast<ssize_t>(size)) {
        terminate();

        if (recd == 0 && errno == 0) {
            LOG_RPC_DETAIL("No more data when trying to read %s on fd %d", what, fd.get());
    if (status_t status = session->mShutdownTrigger->interruptableReadFully(fd.get(), data, size);
        status != OK) {
        ALOGE("Failed to read %s (%zu bytes) on fd %d, error: %s", what, size, fd.get(),
              statusToString(status).c_str());
        return false;
    }

        ALOGE("Failed to read %s (received %zd of %zu bytes) on fd %d, error: %s", what, recd, size,
              fd.get(), strerror(errno));
        return false;
    } else {
    LOG_RPC_DETAIL("Received %s on fd %d: %s", what, fd.get(), hexString(data, size).c_str());
    }

    return true;
}

@@ -398,7 +390,7 @@ status_t RpcState::waitForReply(const base::unique_fd& fd, const sp<RpcSession>&
                                Parcel* reply) {
    RpcWireHeader command;
    while (true) {
        if (!rpcRec(fd, "command header", &command, sizeof(command))) {
        if (!rpcRec(fd, session, "command header", &command, sizeof(command))) {
            return DEAD_OBJECT;
        }

@@ -413,7 +405,7 @@ status_t RpcState::waitForReply(const base::unique_fd& fd, const sp<RpcSession>&
        return NO_MEMORY;
    }

    if (!rpcRec(fd, "reply body", data.data(), command.bodySize)) {
    if (!rpcRec(fd, session, "reply body", data.data(), command.bodySize)) {
        return DEAD_OBJECT;
    }

@@ -465,7 +457,7 @@ status_t RpcState::getAndExecuteCommand(const base::unique_fd& fd, const sp<RpcS
    LOG_RPC_DETAIL("getAndExecuteCommand on fd %d", fd.get());

    RpcWireHeader command;
    if (!rpcRec(fd, "command header", &command, sizeof(command))) {
    if (!rpcRec(fd, session, "command header", &command, sizeof(command))) {
        return DEAD_OBJECT;
    }

@@ -493,7 +485,7 @@ status_t RpcState::processServerCommand(const base::unique_fd& fd, const sp<RpcS
        case RPC_COMMAND_TRANSACT:
            return processTransact(fd, session, command);
        case RPC_COMMAND_DEC_STRONG:
            return processDecStrong(fd, command);
            return processDecStrong(fd, session, command);
    }

    // We should always know the version of the opposing side, and since the
@@ -513,7 +505,7 @@ status_t RpcState::processTransact(const base::unique_fd& fd, const sp<RpcSessio
    if (!transactionData.valid()) {
        return NO_MEMORY;
    }
    if (!rpcRec(fd, "transaction body", transactionData.data(), transactionData.size())) {
    if (!rpcRec(fd, session, "transaction body", transactionData.data(), transactionData.size())) {
        return DEAD_OBJECT;
    }

@@ -626,7 +618,7 @@ status_t RpcState::processTransactInternal(const base::unique_fd& fd, const sp<R
                        //
                        // sessions associated with servers must have an ID
                        // (hence abort)
                        int32_t id = session->getPrivateAccessorForId().get().value();
                        int32_t id = session->mId.value();
                        replyStatus = reply.writeInt32(id);
                        break;
                    }
@@ -721,14 +713,15 @@ status_t RpcState::processTransactInternal(const base::unique_fd& fd, const sp<R
    return OK;
}

status_t RpcState::processDecStrong(const base::unique_fd& fd, const RpcWireHeader& command) {
status_t RpcState::processDecStrong(const base::unique_fd& fd, const sp<RpcSession>& session,
                                    const RpcWireHeader& command) {
    LOG_ALWAYS_FATAL_IF(command.command != RPC_COMMAND_DEC_STRONG, "command: %d", command.command);

    CommandData commandData(command.bodySize);
    if (!commandData.valid()) {
        return NO_MEMORY;
    }
    if (!rpcRec(fd, "dec ref body", commandData.data(), commandData.size())) {
    if (!rpcRec(fd, session, "dec ref body", commandData.data(), commandData.size())) {
        return DEAD_OBJECT;
    }

+3 −1
Original line number Diff line number Diff line
@@ -117,7 +117,8 @@ private:

    [[nodiscard]] bool rpcSend(const base::unique_fd& fd, const char* what, const void* data,
                               size_t size);
    [[nodiscard]] bool rpcRec(const base::unique_fd& fd, const char* what, void* data, size_t size);
    [[nodiscard]] bool rpcRec(const base::unique_fd& fd, const sp<RpcSession>& session,
                              const char* what, void* data, size_t size);

    [[nodiscard]] status_t waitForReply(const base::unique_fd& fd, const sp<RpcSession>& session,
                                        Parcel* reply);
@@ -130,6 +131,7 @@ private:
                                                   const sp<RpcSession>& session,
                                                   CommandData transactionData);
    [[nodiscard]] status_t processDecStrong(const base::unique_fd& fd,
                                            const sp<RpcSession>& session,
                                            const RpcWireHeader& command);

    struct BinderNode {
+2 −1
Original line number Diff line number Diff line
@@ -150,6 +150,7 @@ public:
    // internal use only

    void onSessionTerminating(const sp<RpcSession>& session);
    void onSessionThreadEnding(const sp<RpcSession>& session);

private:
    friend sp<RpcServer>;
@@ -171,7 +172,7 @@ private:
    wp<IBinder> mRootObjectWeak;
    std::map<int32_t, sp<RpcSession>> mSessions;
    int32_t mSessionIdCounter = 0;
    std::unique_ptr<RpcSession::FdTrigger> mShutdownTrigger;
    std::shared_ptr<RpcSession::FdTrigger> mShutdownTrigger;
    std::condition_variable mShutdownCv;
};

Loading