Donate to e Foundation | Murena handsets with /e/OS | Own a part of Murena! Learn more

Commit 43921d5d authored by Steven Moreland's avatar Steven Moreland
Browse files

libbinder: RPC handle builtup refcounts

Generally, in the binder RPC wire protocol, we don't have both the
clients and the servers writing data into sockets. However, in the case
of async transactions, this happens in an unbounded way because a client
may send many oneway transactions, and the server will be sending back
refcounting information related to these transactions (which we process
lazily).

In order to prevent this from building up, when sending a transaction,
if we're unable to write it, instead of waiting, drain that reference
counting information.

Bug: 182940634
Test: binderRpcTest (no longer deadlocks in OnewayStressTest)
Test: manually check 'drainCommands' happens in both raw and tls cases
    during this test (checking we are actually getting coverage)
Change-Id: I82039d6188196261b22316e95d8e180c4c33ae73
parent 301c3f0c
Loading
Loading
Loading
Loading
+3 −3
Original line number Diff line number Diff line
@@ -275,7 +275,7 @@ void RpcServer::establishConnection(sp<RpcServer>&& server, base::unique_fd clie
    RpcConnectionHeader header;
    if (status == OK) {
        status = client->interruptableReadFully(server->mShutdownTrigger.get(), &header,
                                                sizeof(header));
                                                sizeof(header), {});
        if (status != OK) {
            ALOGE("Failed to read ID for client connecting to RPC server: %s",
                  statusToString(status).c_str());
@@ -288,7 +288,7 @@ void RpcServer::establishConnection(sp<RpcServer>&& server, base::unique_fd clie
        if (header.sessionIdSize > 0) {
            sessionId.resize(header.sessionIdSize);
            status = client->interruptableReadFully(server->mShutdownTrigger.get(),
                                                    sessionId.data(), sessionId.size());
                                                    sessionId.data(), sessionId.size(), {});
            if (status != OK) {
                ALOGE("Failed to read session ID for client connecting to RPC server: %s",
                      statusToString(status).c_str());
@@ -313,7 +313,7 @@ void RpcServer::establishConnection(sp<RpcServer>&& server, base::unique_fd clie
            };

            status = client->interruptableWriteFully(server->mShutdownTrigger.get(), &response,
                                                     sizeof(response));
                                                     sizeof(response), {});
            if (status != OK) {
                ALOGE("Failed to send new session response: %s", statusToString(status).c_str());
                // still need to cleanup before we can return
+2 −2
Original line number Diff line number Diff line
@@ -560,7 +560,7 @@ status_t RpcSession::initAndAddConnection(unique_fd fd, const std::vector<uint8_
    }

    auto sendHeaderStatus =
            server->interruptableWriteFully(mShutdownTrigger.get(), &header, sizeof(header));
            server->interruptableWriteFully(mShutdownTrigger.get(), &header, sizeof(header), {});
    if (sendHeaderStatus != OK) {
        ALOGE("Could not write connection header to socket: %s",
              statusToString(sendHeaderStatus).c_str());
@@ -570,7 +570,7 @@ status_t RpcSession::initAndAddConnection(unique_fd fd, const std::vector<uint8_
    if (sessionId.size() > 0) {
        auto sendSessionIdStatus =
                server->interruptableWriteFully(mShutdownTrigger.get(), sessionId.data(),
                                                sessionId.size());
                                                sessionId.size(), {});
        if (sendSessionIdStatus != OK) {
            ALOGE("Could not write session ID ('%s') to socket: %s",
                  base::HexString(sessionId.data(), sessionId.size()).c_str(),
+31 −8
Original line number Diff line number Diff line
@@ -307,7 +307,7 @@ RpcState::CommandData::CommandData(size_t size) : mSize(size) {

status_t RpcState::rpcSend(const sp<RpcSession::RpcConnection>& connection,
                           const sp<RpcSession>& session, const char* what, const void* data,
                           size_t size) {
                           size_t size, const std::function<status_t()>& altPoll) {
    LOG_RPC_DETAIL("Sending %s on RpcTransport %p: %s", what, connection->rpcTransport.get(),
                   android::base::HexString(data, size).c_str());

@@ -319,7 +319,7 @@ status_t RpcState::rpcSend(const sp<RpcSession::RpcConnection>& connection,

    if (status_t status =
                connection->rpcTransport->interruptableWriteFully(session->mShutdownTrigger.get(),
                                                                  data, size);
                                                                  data, size, altPoll);
        status != OK) {
        LOG_RPC_DETAIL("Failed to write %s (%zu bytes) on RpcTransport %p, error: %s", what, size,
                       connection->rpcTransport.get(), statusToString(status).c_str());
@@ -341,7 +341,7 @@ status_t RpcState::rpcRec(const sp<RpcSession::RpcConnection>& connection,

    if (status_t status =
                connection->rpcTransport->interruptableReadFully(session->mShutdownTrigger.get(),
                                                                 data, size);
                                                                 data, size, {});
        status != OK) {
        LOG_RPC_DETAIL("Failed to read %s (%zu bytes) on RpcTransport %p, error: %s", what, size,
                       connection->rpcTransport.get(), statusToString(status).c_str());
@@ -519,21 +519,44 @@ status_t RpcState::transactAddress(const sp<RpcSession::RpcConnection>& connecti
    memcpy(transactionData.data() + sizeof(RpcWireHeader) + sizeof(RpcWireTransaction), data.data(),
           data.dataSize());

    constexpr size_t kWaitMaxUs = 1000000;
    constexpr size_t kWaitLogUs = 10000;
    size_t waitUs = 0;

    // Oneway calls have no sync point, so if many are sent before, whether this
    // is a twoway or oneway transaction, they may have filled up the socket.
    // So, make sure we drain them before polling.
    std::function<status_t()> drainRefs = [&] {
        if (waitUs > kWaitLogUs) {
            ALOGE("Cannot send command, trying to process pending refcounts. Waiting %zuus. Too "
                  "many oneway calls?",
                  waitUs);
        }

        if (waitUs > 0) {
            usleep(waitUs);
            waitUs = std::min(kWaitMaxUs, waitUs * 2);
        } else {
            waitUs = 1;
        }

        return drainCommands(connection, session, CommandType::CONTROL_ONLY);
    };

    if (status_t status = rpcSend(connection, session, "transaction", transactionData.data(),
                                  transactionData.size());
        status != OK)
                                  transactionData.size(), drainRefs);
        status != OK) {
        // TODO(b/167966510): need to undo onBinderLeaving - we know the
        // refcount isn't successfully transferred.
        return status;
    }

    if (flags & IBinder::FLAG_ONEWAY) {
        LOG_RPC_DETAIL("Oneway command, so no longer waiting on RpcTransport %p",
                       connection->rpcTransport.get());

        // Do not wait on result.
        // However, too many oneway calls may cause refcounts to build up and fill up the socket,
        // so process those.
        return drainCommands(connection, session, CommandType::CONTROL_ONLY);
        return OK;
    }

    LOG_ALWAYS_FATAL_IF(reply == nullptr, "Reply parcel must be used for synchronous transaction.");
+2 −1
Original line number Diff line number Diff line
@@ -177,7 +177,8 @@ private:

    [[nodiscard]] status_t rpcSend(const sp<RpcSession::RpcConnection>& connection,
                                   const sp<RpcSession>& session, const char* what,
                                   const void* data, size_t size);
                                   const void* data, size_t size,
                                   const std::function<status_t()>& altPoll = nullptr);
    [[nodiscard]] status_t rpcRec(const sp<RpcSession::RpcConnection>& connection,
                                  const sp<RpcSession>& session, const char* what, void* data,
                                  size_t size);
+23 −12
Original line number Diff line number Diff line
@@ -46,7 +46,7 @@ public:
    template <typename Buffer, typename SendOrReceive>
    status_t interruptableReadOrWrite(FdTrigger* fdTrigger, Buffer buffer, size_t size,
                                      SendOrReceive sendOrReceiveFun, const char* funName,
                                      int16_t event) {
                                      int16_t event, const std::function<status_t()>& altPoll) {
        const Buffer end = buffer + size;

        MAYBE_WAIT_IN_FLAKE_MODE;
@@ -57,9 +57,8 @@ public:
            return DEAD_OBJECT;
        }

        bool first = true;
        status_t status;
        do {
        bool havePolled = false;
        while (true) {
            ssize_t processSize = TEMP_FAILURE_RETRY(
                    sendOrReceiveFun(mSocket.get(), buffer, end - buffer, MSG_NOSIGNAL));

@@ -68,7 +67,8 @@ public:

                // Still return the error on later passes, since it would expose
                // a problem with polling
                if (!first || (first && savedErrno != EAGAIN && savedErrno != EWOULDBLOCK)) {
                if (havePolled ||
                    (!havePolled && savedErrno != EAGAIN && savedErrno != EWOULDBLOCK)) {
                    LOG_RPC_DETAIL("RpcTransport %s(): %s", funName, strerror(savedErrno));
                    return -savedErrno;
                }
@@ -81,19 +81,30 @@ public:
                }
            }

            if (first) first = false;
        } while ((status = fdTrigger->triggerablePoll(mSocket.get(), event)) == OK);
            if (altPoll) {
                if (status_t status = altPoll(); status != OK) return status;
                if (fdTrigger->isTriggered()) {
                    return DEAD_OBJECT;
                }
            } else {
                if (status_t status = fdTrigger->triggerablePoll(mSocket.get(), event);
                    status != OK)
                    return status;
                if (!havePolled) havePolled = true;
            }
        }
    }

    status_t interruptableWriteFully(FdTrigger* fdTrigger, const void* data, size_t size) override {
    status_t interruptableWriteFully(FdTrigger* fdTrigger, const void* data, size_t size,
                                     const std::function<status_t()>& altPoll) override {
        return interruptableReadOrWrite(fdTrigger, reinterpret_cast<const uint8_t*>(data), size,
                                        send, "send", POLLOUT);
                                        send, "send", POLLOUT, altPoll);
    }

    status_t interruptableReadFully(FdTrigger* fdTrigger, void* data, size_t size) override {
    status_t interruptableReadFully(FdTrigger* fdTrigger, void* data, size_t size,
                                    const std::function<status_t()>& altPoll) override {
        return interruptableReadOrWrite(fdTrigger, reinterpret_cast<uint8_t*>(data), size, recv,
                                        "recv", POLLIN);
                                        "recv", POLLIN, altPoll);
    }

private:
Loading