Donate to e Foundation | Murena handsets with /e/OS | Own a part of Murena! Learn more

Commit 45916263 authored by Steven Moreland's avatar Steven Moreland Committed by Automerger Merge Worker
Browse files

Merge "libbinder: shutdown session threads" am: c1500480 am: e8990d0d am:...

Merge "libbinder: shutdown session threads" am: c1500480 am: e8990d0d am: bf6d2a9c am: 6c6cd10c

Original change: https://android-review.googlesource.com/c/platform/frameworks/native/+/1716214

Change-Id: I02b7a1f5cf7fef32a5c88983f63b83dfc0497306
parents eaa613b4 6c6cd10c
Loading
Loading
Loading
Loading
+10 −4
Original line number Diff line number Diff line
@@ -192,10 +192,10 @@ bool RpcServer::shutdown() {
    }

    mShutdownTrigger->trigger();
    while (mJoinThreadRunning || !mConnectingThreads.empty()) {
    while (mJoinThreadRunning || !mConnectingThreads.empty() || !mSessions.empty()) {
        ALOGI("Waiting for RpcServer to shut down. Join thread running: %d, Connecting threads: "
              "%zu",
              mJoinThreadRunning, mConnectingThreads.size());
              "%zu, Sessions: %zu",
              mJoinThreadRunning, mConnectingThreads.size(), mSessions.size());
        mShutdownCv.wait(_l);
    }

@@ -278,7 +278,8 @@ void RpcServer::establishConnection(sp<RpcServer>&& server, base::unique_fd clie
            server->mSessionIdCounter++;

            session = RpcSession::make();
            session->setForServer(wp<RpcServer>(server), server->mSessionIdCounter);
            session->setForServer(wp<RpcServer>(server), server->mSessionIdCounter,
                                  server->mShutdownTrigger);

            server->mSessions[server->mSessionIdCounter] = session;
        } else {
@@ -344,6 +345,11 @@ void RpcServer::onSessionTerminating(const sp<RpcSession>& session) {
    (void)mSessions.erase(it);
}

void RpcServer::onSessionThreadEnding(const sp<RpcSession>& session) {
    (void)session;
    mShutdownCv.notify_all();
}

bool RpcServer::hasServer() {
    LOG_ALWAYS_FATAL_IF(!mAgreedExperimental, "no!");
    std::lock_guard<std::mutex> _l(mLock);
+19 −1
Original line number Diff line number Diff line
@@ -207,12 +207,19 @@ void RpcSession::join(unique_fd client) {
    LOG_ALWAYS_FATAL_IF(!removeServerConnection(connection),
                        "bad state: connection object guaranteed to be in list");

    sp<RpcServer> server;
    {
        std::lock_guard<std::mutex> _l(mMutex);
        auto it = mThreads.find(std::this_thread::get_id());
        LOG_ALWAYS_FATAL_IF(it == mThreads.end());
        it->second.detach();
        mThreads.erase(it);

        server = mForServer.promote();
    }

    if (server != nullptr) {
        server->onSessionThreadEnding(sp<RpcSession>::fromExisting(this));
    }
}

@@ -314,14 +321,25 @@ bool RpcSession::setupOneSocketClient(const RpcSocketAddress& addr, int32_t id)

void RpcSession::addClientConnection(unique_fd fd) {
    std::lock_guard<std::mutex> _l(mMutex);

    if (mShutdownTrigger == nullptr) {
        mShutdownTrigger = FdTrigger::make();
    }

    sp<RpcConnection> session = sp<RpcConnection>::make();
    session->fd = std::move(fd);
    mClientConnections.push_back(session);
}

void RpcSession::setForServer(const wp<RpcServer>& server, int32_t sessionId) {
void RpcSession::setForServer(const wp<RpcServer>& server, int32_t sessionId,
                              const std::shared_ptr<FdTrigger>& shutdownTrigger) {
    LOG_ALWAYS_FATAL_IF(mForServer.unsafe_get() != nullptr);
    LOG_ALWAYS_FATAL_IF(mShutdownTrigger != nullptr);
    LOG_ALWAYS_FATAL_IF(shutdownTrigger == nullptr);

    mId = sessionId;
    mForServer = server;
    mShutdownTrigger = shutdownTrigger;
}

sp<RpcSession::RpcConnection> RpcSession::assignServerToThisThread(unique_fd fd) {
+16 −23
Original line number Diff line number Diff line
@@ -229,30 +229,22 @@ bool RpcState::rpcSend(const base::unique_fd& fd, const char* what, const void*
    return true;
}

bool RpcState::rpcRec(const base::unique_fd& fd, const char* what, void* data, size_t size) {
bool RpcState::rpcRec(const base::unique_fd& fd, const sp<RpcSession>& session, const char* what,
                      void* data, size_t size) {
    if (size > std::numeric_limits<ssize_t>::max()) {
        ALOGE("Cannot rec %s at size %zu (too big)", what, size);
        terminate();
        return false;
    }

    ssize_t recd = TEMP_FAILURE_RETRY(recv(fd.get(), data, size, MSG_WAITALL | MSG_NOSIGNAL));

    if (recd < 0 || recd != static_cast<ssize_t>(size)) {
        terminate();

        if (recd == 0 && errno == 0) {
            LOG_RPC_DETAIL("No more data when trying to read %s on fd %d", what, fd.get());
    if (status_t status = session->mShutdownTrigger->interruptableReadFully(fd.get(), data, size);
        status != OK) {
        ALOGE("Failed to read %s (%zu bytes) on fd %d, error: %s", what, size, fd.get(),
              statusToString(status).c_str());
        return false;
    }

        ALOGE("Failed to read %s (received %zd of %zu bytes) on fd %d, error: %s", what, recd, size,
              fd.get(), strerror(errno));
        return false;
    } else {
    LOG_RPC_DETAIL("Received %s on fd %d: %s", what, fd.get(), hexString(data, size).c_str());
    }

    return true;
}

@@ -398,7 +390,7 @@ status_t RpcState::waitForReply(const base::unique_fd& fd, const sp<RpcSession>&
                                Parcel* reply) {
    RpcWireHeader command;
    while (true) {
        if (!rpcRec(fd, "command header", &command, sizeof(command))) {
        if (!rpcRec(fd, session, "command header", &command, sizeof(command))) {
            return DEAD_OBJECT;
        }

@@ -413,7 +405,7 @@ status_t RpcState::waitForReply(const base::unique_fd& fd, const sp<RpcSession>&
        return NO_MEMORY;
    }

    if (!rpcRec(fd, "reply body", data.data(), command.bodySize)) {
    if (!rpcRec(fd, session, "reply body", data.data(), command.bodySize)) {
        return DEAD_OBJECT;
    }

@@ -465,7 +457,7 @@ status_t RpcState::getAndExecuteCommand(const base::unique_fd& fd, const sp<RpcS
    LOG_RPC_DETAIL("getAndExecuteCommand on fd %d", fd.get());

    RpcWireHeader command;
    if (!rpcRec(fd, "command header", &command, sizeof(command))) {
    if (!rpcRec(fd, session, "command header", &command, sizeof(command))) {
        return DEAD_OBJECT;
    }

@@ -493,7 +485,7 @@ status_t RpcState::processServerCommand(const base::unique_fd& fd, const sp<RpcS
        case RPC_COMMAND_TRANSACT:
            return processTransact(fd, session, command);
        case RPC_COMMAND_DEC_STRONG:
            return processDecStrong(fd, command);
            return processDecStrong(fd, session, command);
    }

    // We should always know the version of the opposing side, and since the
@@ -513,7 +505,7 @@ status_t RpcState::processTransact(const base::unique_fd& fd, const sp<RpcSessio
    if (!transactionData.valid()) {
        return NO_MEMORY;
    }
    if (!rpcRec(fd, "transaction body", transactionData.data(), transactionData.size())) {
    if (!rpcRec(fd, session, "transaction body", transactionData.data(), transactionData.size())) {
        return DEAD_OBJECT;
    }

@@ -626,7 +618,7 @@ status_t RpcState::processTransactInternal(const base::unique_fd& fd, const sp<R
                        //
                        // sessions associated with servers must have an ID
                        // (hence abort)
                        int32_t id = session->getPrivateAccessorForId().get().value();
                        int32_t id = session->mId.value();
                        replyStatus = reply.writeInt32(id);
                        break;
                    }
@@ -721,14 +713,15 @@ status_t RpcState::processTransactInternal(const base::unique_fd& fd, const sp<R
    return OK;
}

status_t RpcState::processDecStrong(const base::unique_fd& fd, const RpcWireHeader& command) {
status_t RpcState::processDecStrong(const base::unique_fd& fd, const sp<RpcSession>& session,
                                    const RpcWireHeader& command) {
    LOG_ALWAYS_FATAL_IF(command.command != RPC_COMMAND_DEC_STRONG, "command: %d", command.command);

    CommandData commandData(command.bodySize);
    if (!commandData.valid()) {
        return NO_MEMORY;
    }
    if (!rpcRec(fd, "dec ref body", commandData.data(), commandData.size())) {
    if (!rpcRec(fd, session, "dec ref body", commandData.data(), commandData.size())) {
        return DEAD_OBJECT;
    }

+3 −1
Original line number Diff line number Diff line
@@ -117,7 +117,8 @@ private:

    [[nodiscard]] bool rpcSend(const base::unique_fd& fd, const char* what, const void* data,
                               size_t size);
    [[nodiscard]] bool rpcRec(const base::unique_fd& fd, const char* what, void* data, size_t size);
    [[nodiscard]] bool rpcRec(const base::unique_fd& fd, const sp<RpcSession>& session,
                              const char* what, void* data, size_t size);

    [[nodiscard]] status_t waitForReply(const base::unique_fd& fd, const sp<RpcSession>& session,
                                        Parcel* reply);
@@ -130,6 +131,7 @@ private:
                                                   const sp<RpcSession>& session,
                                                   CommandData transactionData);
    [[nodiscard]] status_t processDecStrong(const base::unique_fd& fd,
                                            const sp<RpcSession>& session,
                                            const RpcWireHeader& command);

    struct BinderNode {
+2 −1
Original line number Diff line number Diff line
@@ -150,6 +150,7 @@ public:
    // internal use only

    void onSessionTerminating(const sp<RpcSession>& session);
    void onSessionThreadEnding(const sp<RpcSession>& session);

private:
    friend sp<RpcServer>;
@@ -171,7 +172,7 @@ private:
    wp<IBinder> mRootObjectWeak;
    std::map<int32_t, sp<RpcSession>> mSessions;
    int32_t mSessionIdCounter = 0;
    std::unique_ptr<RpcSession::FdTrigger> mShutdownTrigger;
    std::shared_ptr<RpcSession::FdTrigger> mShutdownTrigger;
    std::condition_variable mShutdownCv;
};

Loading