Donate to e Foundation | Murena handsets with /e/OS | Own a part of Murena! Learn more

Commit 2c1d0013 authored by Steven Moreland's avatar Steven Moreland Committed by Automerger Merge Worker
Browse files

Merge changes Ibe1a854b,I8c281ad2,I9e290cd0,I0035be2d am: 1b48b713

Original change: https://android-review.googlesource.com/c/platform/frameworks/native/+/1727277

Change-Id: Ia3bc2c46ed5a9ae50523a17855f1357229968fd8
parents 5f0b1f59 1b48b713
Loading
Loading
Loading
Loading
+14 −5
Original line number Original line Diff line number Diff line
@@ -187,6 +187,11 @@ bool RpcServer::shutdown() {
    }
    }


    mShutdownTrigger->trigger();
    mShutdownTrigger->trigger();
    for (auto& [id, session] : mSessions) {
        (void)id;
        session->mShutdownTrigger->trigger();
    }

    while (mJoinThreadRunning || !mConnectingThreads.empty() || !mSessions.empty()) {
    while (mJoinThreadRunning || !mConnectingThreads.empty() || !mSessions.empty()) {
        if (std::cv_status::timeout == mShutdownCv.wait_for(_l, std::chrono::seconds(1))) {
        if (std::cv_status::timeout == mShutdownCv.wait_for(_l, std::chrono::seconds(1))) {
            ALOGE("Waiting for RpcServer to shut down (1s w/o progress). Join thread running: %d, "
            ALOGE("Waiting for RpcServer to shut down (1s w/o progress). Join thread running: %d, "
@@ -261,7 +266,7 @@ void RpcServer::establishConnection(sp<RpcServer>&& server, base::unique_fd clie
        };
        };
        server->mConnectingThreads.erase(threadId);
        server->mConnectingThreads.erase(threadId);


        if (!idValid) {
        if (!idValid || server->mShutdownTrigger->isTriggered()) {
            return;
            return;
        }
        }


@@ -276,10 +281,14 @@ void RpcServer::establishConnection(sp<RpcServer>&& server, base::unique_fd clie


            session = RpcSession::make();
            session = RpcSession::make();
            session->setMaxThreads(server->mMaxThreads);
            session->setMaxThreads(server->mMaxThreads);
            session->setForServer(server,
            if (!session->setForServer(server,
                                       sp<RpcServer::EventListener>::fromExisting(
                                       sp<RpcServer::EventListener>::fromExisting(
                                          static_cast<RpcServer::EventListener*>(server.get())),
                                               static_cast<RpcServer::EventListener*>(
                                  server->mSessionIdCounter, server->mShutdownTrigger);
                                                       server.get())),
                                       server->mSessionIdCounter)) {
                ALOGE("Failed to attach server to session");
                return;
            }


            server->mSessions[server->mSessionIdCounter] = session;
            server->mSessions[server->mSessionIdCounter] = session;
        } else {
        } else {
+25 −13
Original line number Original line Diff line number Diff line
@@ -113,17 +113,21 @@ status_t RpcSession::getRemoteMaxThreads(size_t* maxThreads) {
    return state()->getMaxThreads(connection.fd(), sp<RpcSession>::fromExisting(this), maxThreads);
    return state()->getMaxThreads(connection.fd(), sp<RpcSession>::fromExisting(this), maxThreads);
}
}


bool RpcSession::shutdown() {
bool RpcSession::shutdownAndWait(bool wait) {
    std::unique_lock<std::mutex> _l(mMutex);
    std::unique_lock<std::mutex> _l(mMutex);
    LOG_ALWAYS_FATAL_IF(mForServer.promote() != nullptr, "Can only shut down client session");
    LOG_ALWAYS_FATAL_IF(mShutdownTrigger == nullptr, "Shutdown trigger not installed");
    LOG_ALWAYS_FATAL_IF(mShutdownTrigger == nullptr, "Shutdown trigger not installed");
    LOG_ALWAYS_FATAL_IF(mShutdownListener == nullptr, "Shutdown listener not installed");


    mShutdownTrigger->trigger();
    mShutdownTrigger->trigger();
    mShutdownListener->waitForShutdown(_l);
    mState->terminate();


    if (wait) {
        LOG_ALWAYS_FATAL_IF(mShutdownListener == nullptr, "Shutdown listener not installed");
        mShutdownListener->waitForShutdown(_l);
        LOG_ALWAYS_FATAL_IF(!mThreads.empty(), "Shutdown failed");
        LOG_ALWAYS_FATAL_IF(!mThreads.empty(), "Shutdown failed");
    }

    _l.unlock();
    mState->clear();

    return true;
    return true;
}
}


@@ -139,12 +143,15 @@ status_t RpcSession::transact(const sp<IBinder>& binder, uint32_t code, const Pa
status_t RpcSession::sendDecStrong(const RpcAddress& address) {
status_t RpcSession::sendDecStrong(const RpcAddress& address) {
    ExclusiveConnection connection(sp<RpcSession>::fromExisting(this),
    ExclusiveConnection connection(sp<RpcSession>::fromExisting(this),
                                   ConnectionUse::CLIENT_REFCOUNT);
                                   ConnectionUse::CLIENT_REFCOUNT);
    return state()->sendDecStrong(connection.fd(), address);
    return state()->sendDecStrong(connection.fd(), sp<RpcSession>::fromExisting(this), address);
}
}


std::unique_ptr<RpcSession::FdTrigger> RpcSession::FdTrigger::make() {
std::unique_ptr<RpcSession::FdTrigger> RpcSession::FdTrigger::make() {
    auto ret = std::make_unique<RpcSession::FdTrigger>();
    auto ret = std::make_unique<RpcSession::FdTrigger>();
    if (!android::base::Pipe(&ret->mRead, &ret->mWrite)) return nullptr;
    if (!android::base::Pipe(&ret->mRead, &ret->mWrite)) {
        ALOGE("Could not create pipe %s", strerror(errno));
        return nullptr;
    }
    return ret;
    return ret;
}
}


@@ -152,6 +159,10 @@ void RpcSession::FdTrigger::trigger() {
    mWrite.reset();
    mWrite.reset();
}
}


bool RpcSession::FdTrigger::isTriggered() {
    return mWrite == -1;
}

status_t RpcSession::FdTrigger::triggerablePollRead(base::borrowed_fd fd) {
status_t RpcSession::FdTrigger::triggerablePollRead(base::borrowed_fd fd) {
    while (true) {
    while (true) {
        pollfd pfd[]{{.fd = fd.get(), .events = POLLIN | POLLHUP, .revents = 0},
        pollfd pfd[]{{.fd = fd.get(), .events = POLLIN | POLLHUP, .revents = 0},
@@ -408,20 +419,21 @@ bool RpcSession::addClientConnection(unique_fd fd) {
    return true;
    return true;
}
}


void RpcSession::setForServer(const wp<RpcServer>& server, const wp<EventListener>& eventListener,
bool RpcSession::setForServer(const wp<RpcServer>& server, const wp<EventListener>& eventListener,
                              int32_t sessionId,
                              int32_t sessionId) {
                              const std::shared_ptr<FdTrigger>& shutdownTrigger) {
    LOG_ALWAYS_FATAL_IF(mForServer != nullptr);
    LOG_ALWAYS_FATAL_IF(mForServer != nullptr);
    LOG_ALWAYS_FATAL_IF(server == nullptr);
    LOG_ALWAYS_FATAL_IF(server == nullptr);
    LOG_ALWAYS_FATAL_IF(mEventListener != nullptr);
    LOG_ALWAYS_FATAL_IF(mEventListener != nullptr);
    LOG_ALWAYS_FATAL_IF(eventListener == nullptr);
    LOG_ALWAYS_FATAL_IF(eventListener == nullptr);
    LOG_ALWAYS_FATAL_IF(mShutdownTrigger != nullptr);
    LOG_ALWAYS_FATAL_IF(mShutdownTrigger != nullptr);
    LOG_ALWAYS_FATAL_IF(shutdownTrigger == nullptr);

    mShutdownTrigger = FdTrigger::make();
    if (mShutdownTrigger == nullptr) return false;


    mId = sessionId;
    mId = sessionId;
    mForServer = server;
    mForServer = server;
    mEventListener = eventListener;
    mEventListener = eventListener;
    mShutdownTrigger = shutdownTrigger;
    return true;
}
}


sp<RpcSession::RpcConnection> RpcSession::assignServerToThisThread(unique_fd fd) {
sp<RpcSession::RpcConnection> RpcSession::assignServerToThisThread(unique_fd fd) {
+86 −56
Original line number Original line Diff line number Diff line
@@ -137,9 +137,39 @@ void RpcState::dump() {
    dumpLocked();
    dumpLocked();
}
}


void RpcState::terminate() {
void RpcState::clear() {
    std::unique_lock<std::mutex> _l(mNodeMutex);
    std::unique_lock<std::mutex> _l(mNodeMutex);
    terminate(_l);

    if (mTerminated) {
        LOG_ALWAYS_FATAL_IF(!mNodeForAddress.empty(),
                            "New state should be impossible after terminating!");
        return;
    }

    if (SHOULD_LOG_RPC_DETAIL) {
        ALOGE("RpcState::clear()");
        dumpLocked();
    }

    // if the destructor of a binder object makes another RPC call, then calling
    // decStrong could deadlock. So, we must hold onto these binders until
    // mNodeMutex is no longer taken.
    std::vector<sp<IBinder>> tempHoldBinder;

    mTerminated = true;
    for (auto& [address, node] : mNodeForAddress) {
        sp<IBinder> binder = node.binder.promote();
        LOG_ALWAYS_FATAL_IF(binder == nullptr, "Binder %p expected to be owned.", binder.get());

        if (node.sentRef != nullptr) {
            tempHoldBinder.push_back(node.sentRef);
        }
    }

    mNodeForAddress.clear();

    _l.unlock();
    tempHoldBinder.clear(); // explicit
}
}


void RpcState::dumpLocked() {
void RpcState::dumpLocked() {
@@ -170,32 +200,6 @@ void RpcState::dumpLocked() {
    ALOGE("END DUMP OF RpcState");
    ALOGE("END DUMP OF RpcState");
}
}


void RpcState::terminate(std::unique_lock<std::mutex>& lock) {
    if (SHOULD_LOG_RPC_DETAIL) {
        ALOGE("RpcState::terminate()");
        dumpLocked();
    }

    // if the destructor of a binder object makes another RPC call, then calling
    // decStrong could deadlock. So, we must hold onto these binders until
    // mNodeMutex is no longer taken.
    std::vector<sp<IBinder>> tempHoldBinder;

    mTerminated = true;
    for (auto& [address, node] : mNodeForAddress) {
        sp<IBinder> binder = node.binder.promote();
        LOG_ALWAYS_FATAL_IF(binder == nullptr, "Binder %p expected to be owned.", binder.get());

        if (node.sentRef != nullptr) {
            tempHoldBinder.push_back(node.sentRef);
        }
    }

    mNodeForAddress.clear();

    lock.unlock();
    tempHoldBinder.clear(); // explicit
}


RpcState::CommandData::CommandData(size_t size) : mSize(size) {
RpcState::CommandData::CommandData(size_t size) : mSize(size) {
    // The maximum size for regular binder is 1MB for all concurrent
    // The maximum size for regular binder is 1MB for all concurrent
@@ -218,13 +222,13 @@ RpcState::CommandData::CommandData(size_t size) : mSize(size) {
    mData.reset(new (std::nothrow) uint8_t[size]);
    mData.reset(new (std::nothrow) uint8_t[size]);
}
}


status_t RpcState::rpcSend(const base::unique_fd& fd, const char* what, const void* data,
status_t RpcState::rpcSend(const base::unique_fd& fd, const sp<RpcSession>& session,
                           size_t size) {
                           const char* what, const void* data, size_t size) {
    LOG_RPC_DETAIL("Sending %s on fd %d: %s", what, fd.get(), hexString(data, size).c_str());
    LOG_RPC_DETAIL("Sending %s on fd %d: %s", what, fd.get(), hexString(data, size).c_str());


    if (size > std::numeric_limits<ssize_t>::max()) {
    if (size > std::numeric_limits<ssize_t>::max()) {
        ALOGE("Cannot send %s at size %zu (too big)", what, size);
        ALOGE("Cannot send %s at size %zu (too big)", what, size);
        terminate();
        (void)session->shutdownAndWait(false);
        return BAD_VALUE;
        return BAD_VALUE;
    }
    }


@@ -235,7 +239,7 @@ status_t RpcState::rpcSend(const base::unique_fd& fd, const char* what, const vo
        LOG_RPC_DETAIL("Failed to send %s (sent %zd of %zu bytes) on fd %d, error: %s", what, sent,
        LOG_RPC_DETAIL("Failed to send %s (sent %zd of %zu bytes) on fd %d, error: %s", what, sent,
                       size, fd.get(), strerror(savedErrno));
                       size, fd.get(), strerror(savedErrno));


        terminate();
        (void)session->shutdownAndWait(false);
        return -savedErrno;
        return -savedErrno;
    }
    }


@@ -246,7 +250,7 @@ status_t RpcState::rpcRec(const base::unique_fd& fd, const sp<RpcSession>& sessi
                          const char* what, void* data, size_t size) {
                          const char* what, void* data, size_t size) {
    if (size > std::numeric_limits<ssize_t>::max()) {
    if (size > std::numeric_limits<ssize_t>::max()) {
        ALOGE("Cannot rec %s at size %zu (too big)", what, size);
        ALOGE("Cannot rec %s at size %zu (too big)", what, size);
        terminate();
        (void)session->shutdownAndWait(false);
        return BAD_VALUE;
        return BAD_VALUE;
    }
    }


@@ -358,7 +362,11 @@ status_t RpcState::transactAddress(const base::unique_fd& fd, const RpcAddress&


        if (flags & IBinder::FLAG_ONEWAY) {
        if (flags & IBinder::FLAG_ONEWAY) {
            asyncNumber = it->second.asyncNumber;
            asyncNumber = it->second.asyncNumber;
            if (!nodeProgressAsyncNumber(&it->second, _l)) return DEAD_OBJECT;
            if (!nodeProgressAsyncNumber(&it->second)) {
                _l.unlock();
                (void)session->shutdownAndWait(false);
                return DEAD_OBJECT;
            }
        }
        }
    }
    }


@@ -390,7 +398,7 @@ status_t RpcState::transactAddress(const base::unique_fd& fd, const RpcAddress&
           data.dataSize());
           data.dataSize());


    if (status_t status =
    if (status_t status =
                rpcSend(fd, "transaction", transactionData.data(), transactionData.size());
                rpcSend(fd, session, "transaction", transactionData.data(), transactionData.size());
        status != OK)
        status != OK)
        return status;
        return status;


@@ -442,7 +450,7 @@ status_t RpcState::waitForReply(const base::unique_fd& fd, const sp<RpcSession>&
    if (command.bodySize < sizeof(RpcWireReply)) {
    if (command.bodySize < sizeof(RpcWireReply)) {
        ALOGE("Expecting %zu but got %" PRId32 " bytes for RpcWireReply. Terminating!",
        ALOGE("Expecting %zu but got %" PRId32 " bytes for RpcWireReply. Terminating!",
              sizeof(RpcWireReply), command.bodySize);
              sizeof(RpcWireReply), command.bodySize);
        terminate();
        (void)session->shutdownAndWait(false);
        return BAD_VALUE;
        return BAD_VALUE;
    }
    }
    RpcWireReply* rpcReply = reinterpret_cast<RpcWireReply*>(data.data());
    RpcWireReply* rpcReply = reinterpret_cast<RpcWireReply*>(data.data());
@@ -457,7 +465,8 @@ status_t RpcState::waitForReply(const base::unique_fd& fd, const sp<RpcSession>&
    return OK;
    return OK;
}
}


status_t RpcState::sendDecStrong(const base::unique_fd& fd, const RpcAddress& addr) {
status_t RpcState::sendDecStrong(const base::unique_fd& fd, const sp<RpcSession>& session,
                                 const RpcAddress& addr) {
    {
    {
        std::lock_guard<std::mutex> _l(mNodeMutex);
        std::lock_guard<std::mutex> _l(mNodeMutex);
        if (mTerminated) return DEAD_OBJECT; // avoid fatal only, otherwise races
        if (mTerminated) return DEAD_OBJECT; // avoid fatal only, otherwise races
@@ -476,10 +485,10 @@ status_t RpcState::sendDecStrong(const base::unique_fd& fd, const RpcAddress& ad
            .command = RPC_COMMAND_DEC_STRONG,
            .command = RPC_COMMAND_DEC_STRONG,
            .bodySize = sizeof(RpcWireAddress),
            .bodySize = sizeof(RpcWireAddress),
    };
    };
    if (status_t status = rpcSend(fd, "dec ref header", &cmd, sizeof(cmd)); status != OK)
    if (status_t status = rpcSend(fd, session, "dec ref header", &cmd, sizeof(cmd)); status != OK)
        return status;
        return status;
    if (status_t status =
    if (status_t status = rpcSend(fd, session, "dec ref body", &addr.viewRawEmbedded(),
                rpcSend(fd, "dec ref body", &addr.viewRawEmbedded(), sizeof(RpcWireAddress));
                                  sizeof(RpcWireAddress));
        status != OK)
        status != OK)
        return status;
        return status;
    return OK;
    return OK;
@@ -538,7 +547,7 @@ status_t RpcState::processServerCommand(const base::unique_fd& fd, const sp<RpcS
    // also can't consider it a fatal error because this would allow any client
    // also can't consider it a fatal error because this would allow any client
    // to kill us, so ending the session for misbehaving client.
    // to kill us, so ending the session for misbehaving client.
    ALOGE("Unknown RPC command %d - terminating session", command.command);
    ALOGE("Unknown RPC command %d - terminating session", command.command);
    terminate();
    (void)session->shutdownAndWait(false);
    return DEAD_OBJECT;
    return DEAD_OBJECT;
}
}
status_t RpcState::processTransact(const base::unique_fd& fd, const sp<RpcSession>& session,
status_t RpcState::processTransact(const base::unique_fd& fd, const sp<RpcSession>& session,
@@ -571,7 +580,7 @@ status_t RpcState::processTransactInternal(const base::unique_fd& fd, const sp<R
    if (transactionData.size() < sizeof(RpcWireTransaction)) {
    if (transactionData.size() < sizeof(RpcWireTransaction)) {
        ALOGE("Expecting %zu but got %zu bytes for RpcWireTransaction. Terminating!",
        ALOGE("Expecting %zu but got %zu bytes for RpcWireTransaction. Terminating!",
              sizeof(RpcWireTransaction), transactionData.size());
              sizeof(RpcWireTransaction), transactionData.size());
        terminate();
        (void)session->shutdownAndWait(false);
        return BAD_VALUE;
        return BAD_VALUE;
    }
    }
    RpcWireTransaction* transaction = reinterpret_cast<RpcWireTransaction*>(transactionData.data());
    RpcWireTransaction* transaction = reinterpret_cast<RpcWireTransaction*>(transactionData.data());
@@ -600,15 +609,15 @@ status_t RpcState::processTransactInternal(const base::unique_fd& fd, const sp<R
            // session.
            // session.
            ALOGE("While transacting, binder has been deleted at address %s. Terminating!",
            ALOGE("While transacting, binder has been deleted at address %s. Terminating!",
                  addr.toString().c_str());
                  addr.toString().c_str());
            terminate();
            (void)session->shutdownAndWait(false);
            replyStatus = BAD_VALUE;
            replyStatus = BAD_VALUE;
        } else if (target->localBinder() == nullptr) {
        } else if (target->localBinder() == nullptr) {
            ALOGE("Unknown binder address or non-local binder, not address %s. Terminating!",
            ALOGE("Unknown binder address or non-local binder, not address %s. Terminating!",
                  addr.toString().c_str());
                  addr.toString().c_str());
            terminate();
            (void)session->shutdownAndWait(false);
            replyStatus = BAD_VALUE;
            replyStatus = BAD_VALUE;
        } else if (transaction->flags & IBinder::FLAG_ONEWAY) {
        } else if (transaction->flags & IBinder::FLAG_ONEWAY) {
            std::lock_guard<std::mutex> _l(mNodeMutex);
            std::unique_lock<std::mutex> _l(mNodeMutex);
            auto it = mNodeForAddress.find(addr);
            auto it = mNodeForAddress.find(addr);
            if (it->second.binder.promote() != target) {
            if (it->second.binder.promote() != target) {
                ALOGE("Binder became invalid during transaction. Bad client? %s",
                ALOGE("Binder became invalid during transaction. Bad client? %s",
@@ -617,16 +626,33 @@ status_t RpcState::processTransactInternal(const base::unique_fd& fd, const sp<R
            } else if (transaction->asyncNumber != it->second.asyncNumber) {
            } else if (transaction->asyncNumber != it->second.asyncNumber) {
                // we need to process some other asynchronous transaction
                // we need to process some other asynchronous transaction
                // first
                // first
                // TODO(b/183140903): limit enqueues/detect overfill for bad client
                // TODO(b/183140903): detect when an object is deleted when it still has
                //        pending async transactions
                it->second.asyncTodo.push(BinderNode::AsyncTodo{
                it->second.asyncTodo.push(BinderNode::AsyncTodo{
                        .ref = target,
                        .ref = target,
                        .data = std::move(transactionData),
                        .data = std::move(transactionData),
                        .asyncNumber = transaction->asyncNumber,
                        .asyncNumber = transaction->asyncNumber,
                });
                });
                LOG_RPC_DETAIL("Enqueuing %" PRId64 " on %s", transaction->asyncNumber,

                               addr.toString().c_str());
                size_t numPending = it->second.asyncTodo.size();
                LOG_RPC_DETAIL("Enqueuing %" PRId64 " on %s (%zu pending)",
                               transaction->asyncNumber, addr.toString().c_str(), numPending);

                constexpr size_t kArbitraryOnewayCallTerminateLevel = 10000;
                constexpr size_t kArbitraryOnewayCallWarnLevel = 1000;
                constexpr size_t kArbitraryOnewayCallWarnPer = 1000;

                if (numPending >= kArbitraryOnewayCallWarnLevel) {
                    if (numPending >= kArbitraryOnewayCallTerminateLevel) {
                        ALOGE("WARNING: %zu pending oneway transactions. Terminating!", numPending);
                        _l.unlock();
                        (void)session->shutdownAndWait(false);
                        return FAILED_TRANSACTION;
                    }

                    if (numPending % kArbitraryOnewayCallWarnPer == 0) {
                        ALOGW("Warning: many oneway transactions built up on %p (%zu)",
                              target.get(), numPending);
                    }
                }
                return OK;
                return OK;
            }
            }
        }
        }
@@ -707,7 +733,11 @@ status_t RpcState::processTransactInternal(const base::unique_fd& fd, const sp<R
            // last refcount dropped after this transaction happened
            // last refcount dropped after this transaction happened
            if (it == mNodeForAddress.end()) return OK;
            if (it == mNodeForAddress.end()) return OK;


            if (!nodeProgressAsyncNumber(&it->second, _l)) return DEAD_OBJECT;
            if (!nodeProgressAsyncNumber(&it->second)) {
                _l.unlock();
                (void)session->shutdownAndWait(false);
                return DEAD_OBJECT;
            }


            if (it->second.asyncTodo.size() == 0) return OK;
            if (it->second.asyncTodo.size() == 0) return OK;
            if (it->second.asyncTodo.top().asyncNumber == it->second.asyncNumber) {
            if (it->second.asyncTodo.top().asyncNumber == it->second.asyncNumber) {
@@ -753,7 +783,7 @@ status_t RpcState::processTransactInternal(const base::unique_fd& fd, const sp<R
    memcpy(replyData.data() + sizeof(RpcWireHeader) + sizeof(RpcWireReply), reply.data(),
    memcpy(replyData.data() + sizeof(RpcWireHeader) + sizeof(RpcWireReply), reply.data(),
           reply.dataSize());
           reply.dataSize());


    return rpcSend(fd, "reply", replyData.data(), replyData.size());
    return rpcSend(fd, session, "reply", replyData.data(), replyData.size());
}
}


status_t RpcState::processDecStrong(const base::unique_fd& fd, const sp<RpcSession>& session,
status_t RpcState::processDecStrong(const base::unique_fd& fd, const sp<RpcSession>& session,
@@ -772,7 +802,7 @@ status_t RpcState::processDecStrong(const base::unique_fd& fd, const sp<RpcSessi
    if (command.bodySize < sizeof(RpcWireAddress)) {
    if (command.bodySize < sizeof(RpcWireAddress)) {
        ALOGE("Expecting %zu but got %" PRId32 " bytes for RpcWireAddress. Terminating!",
        ALOGE("Expecting %zu but got %" PRId32 " bytes for RpcWireAddress. Terminating!",
              sizeof(RpcWireAddress), command.bodySize);
              sizeof(RpcWireAddress), command.bodySize);
        terminate();
        (void)session->shutdownAndWait(false);
        return BAD_VALUE;
        return BAD_VALUE;
    }
    }
    RpcWireAddress* address = reinterpret_cast<RpcWireAddress*>(commandData.data());
    RpcWireAddress* address = reinterpret_cast<RpcWireAddress*>(commandData.data());
@@ -790,7 +820,8 @@ status_t RpcState::processDecStrong(const base::unique_fd& fd, const sp<RpcSessi
    if (target == nullptr) {
    if (target == nullptr) {
        ALOGE("While requesting dec strong, binder has been deleted at address %s. Terminating!",
        ALOGE("While requesting dec strong, binder has been deleted at address %s. Terminating!",
              addr.toString().c_str());
              addr.toString().c_str());
        terminate();
        _l.unlock();
        (void)session->shutdownAndWait(false);
        return BAD_VALUE;
        return BAD_VALUE;
    }
    }


@@ -826,12 +857,11 @@ sp<IBinder> RpcState::tryEraseNode(std::map<RpcAddress, BinderNode>::iterator& i
    return ref;
    return ref;
}
}


bool RpcState::nodeProgressAsyncNumber(BinderNode* node, std::unique_lock<std::mutex>& lock) {
bool RpcState::nodeProgressAsyncNumber(BinderNode* node) {
    // 2**64 =~ 10**19 =~ 1000 transactions per second for 585 million years to
    // 2**64 =~ 10**19 =~ 1000 transactions per second for 585 million years to
    // a single binder
    // a single binder
    if (node->asyncNumber >= std::numeric_limits<decltype(node->asyncNumber)>::max()) {
    if (node->asyncNumber >= std::numeric_limits<decltype(node->asyncNumber)>::max()) {
        ALOGE("Out of async transaction IDs. Terminating");
        ALOGE("Out of async transaction IDs. Terminating");
        terminate(lock);
        return false;
        return false;
    }
    }
    node->asyncNumber++;
    node->asyncNumber++;
+7 −8
Original line number Original line Diff line number Diff line
@@ -65,7 +65,8 @@ public:
                                           uint32_t code, const Parcel& data,
                                           uint32_t code, const Parcel& data,
                                           const sp<RpcSession>& session, Parcel* reply,
                                           const sp<RpcSession>& session, Parcel* reply,
                                           uint32_t flags);
                                           uint32_t flags);
    [[nodiscard]] status_t sendDecStrong(const base::unique_fd& fd, const RpcAddress& address);
    [[nodiscard]] status_t sendDecStrong(const base::unique_fd& fd, const sp<RpcSession>& session,
                                         const RpcAddress& address);


    enum class CommandType {
    enum class CommandType {
        ANY,
        ANY,
@@ -110,11 +111,10 @@ public:
     * WARNING: RpcState is responsible for calling this when the session is
     * WARNING: RpcState is responsible for calling this when the session is
     * no longer recoverable.
     * no longer recoverable.
     */
     */
    void terminate();
    void clear();


private:
private:
    void dumpLocked();
    void dumpLocked();
    void terminate(std::unique_lock<std::mutex>& lock);


    // Alternative to std::vector<uint8_t> that doesn't abort on allocation failure and caps
    // Alternative to std::vector<uint8_t> that doesn't abort on allocation failure and caps
    // large allocations to avoid being requested from allocating too much data.
    // large allocations to avoid being requested from allocating too much data.
@@ -130,8 +130,8 @@ private:
        size_t mSize;
        size_t mSize;
    };
    };


    [[nodiscard]] status_t rpcSend(const base::unique_fd& fd, const char* what, const void* data,
    [[nodiscard]] status_t rpcSend(const base::unique_fd& fd, const sp<RpcSession>& session,
                                   size_t size);
                                   const char* what, const void* data, size_t size);
    [[nodiscard]] status_t rpcRec(const base::unique_fd& fd, const sp<RpcSession>& session,
    [[nodiscard]] status_t rpcRec(const base::unique_fd& fd, const sp<RpcSession>& session,
                                  const char* what, void* data, size_t size);
                                  const char* what, void* data, size_t size);


@@ -204,9 +204,8 @@ private:
    // dropped after any locks are removed.
    // dropped after any locks are removed.
    sp<IBinder> tryEraseNode(std::map<RpcAddress, BinderNode>::iterator& it);
    sp<IBinder> tryEraseNode(std::map<RpcAddress, BinderNode>::iterator& it);
    // true - success
    // true - success
    // false - state terminated, lock gone, halt
    // false - session shutdown, halt
    [[nodiscard]] bool nodeProgressAsyncNumber(BinderNode* node,
    [[nodiscard]] bool nodeProgressAsyncNumber(BinderNode* node);
                                               std::unique_lock<std::mutex>& lock);


    std::mutex mNodeMutex;
    std::mutex mNodeMutex;
    bool mTerminated = false;
    bool mTerminated = false;
+1 −1
Original line number Original line Diff line number Diff line
@@ -173,7 +173,7 @@ private:
    wp<IBinder> mRootObjectWeak;
    wp<IBinder> mRootObjectWeak;
    std::map<int32_t, sp<RpcSession>> mSessions;
    std::map<int32_t, sp<RpcSession>> mSessions;
    int32_t mSessionIdCounter = 0;
    int32_t mSessionIdCounter = 0;
    std::shared_ptr<RpcSession::FdTrigger> mShutdownTrigger;
    std::unique_ptr<RpcSession::FdTrigger> mShutdownTrigger;
    std::condition_variable mShutdownCv;
    std::condition_variable mShutdownCv;
};
};


Loading