Donate to e Foundation | Murena handsets with /e/OS | Own a part of Murena! Learn more

Commit 5ae62560 authored by Steven Moreland's avatar Steven Moreland
Browse files

libbinder: RpcState pass connection, not fd

In preparation for better nested commands logic, which requires RpcState
to keep track of when transactions can be nested.

Bug: 167966510
Test: binderRpcTest
Change-Id: Ib1328136bf706c069e0b3c1b8e7c3416d4ff32a7
parent 936fc19a
Loading
Loading
Loading
Loading
+8 −10
Original line number Diff line number Diff line
@@ -108,7 +108,7 @@ sp<IBinder> RpcSession::getRootObject() {
    status_t status = ExclusiveConnection::find(sp<RpcSession>::fromExisting(this),
                                                ConnectionUse::CLIENT, &connection);
    if (status != OK) return nullptr;
    return state()->getRootObject(connection.fd(), sp<RpcSession>::fromExisting(this));
    return state()->getRootObject(connection.get(), sp<RpcSession>::fromExisting(this));
}

status_t RpcSession::getRemoteMaxThreads(size_t* maxThreads) {
@@ -116,7 +116,7 @@ status_t RpcSession::getRemoteMaxThreads(size_t* maxThreads) {
    status_t status = ExclusiveConnection::find(sp<RpcSession>::fromExisting(this),
                                                ConnectionUse::CLIENT, &connection);
    if (status != OK) return status;
    return state()->getMaxThreads(connection.fd(), sp<RpcSession>::fromExisting(this), maxThreads);
    return state()->getMaxThreads(connection.get(), sp<RpcSession>::fromExisting(this), maxThreads);
}

bool RpcSession::shutdownAndWait(bool wait) {
@@ -146,7 +146,7 @@ status_t RpcSession::transact(const sp<IBinder>& binder, uint32_t code, const Pa
                                                                     : ConnectionUse::CLIENT,
                                      &connection);
    if (status != OK) return status;
    return state()->transact(connection.fd(), binder, code, data,
    return state()->transact(connection.get(), binder, code, data,
                             sp<RpcSession>::fromExisting(this), reply, flags);
}

@@ -155,7 +155,7 @@ status_t RpcSession::sendDecStrong(const RpcAddress& address) {
    status_t status = ExclusiveConnection::find(sp<RpcSession>::fromExisting(this),
                                                ConnectionUse::CLIENT_REFCOUNT, &connection);
    if (status != OK) return status;
    return state()->sendDecStrong(connection.fd(), sp<RpcSession>::fromExisting(this), address);
    return state()->sendDecStrong(connection.get(), sp<RpcSession>::fromExisting(this), address);
}

std::unique_ptr<RpcSession::FdTrigger> RpcSession::FdTrigger::make() {
@@ -225,7 +225,7 @@ status_t RpcSession::readId() {
                                                ConnectionUse::CLIENT, &connection);
    if (status != OK) return status;

    status = state()->getSessionId(connection.fd(), sp<RpcSession>::fromExisting(this), &id);
    status = state()->getSessionId(connection.get(), sp<RpcSession>::fromExisting(this), &id);
    if (status != OK) return status;

    LOG_RPC_DETAIL("RpcSession %p has id %d", this, id);
@@ -265,8 +265,7 @@ RpcSession::PreJoinSetupResult RpcSession::preJoinSetup(base::unique_fd fd) {
    // be able to do nested calls (we can't only read from it)
    sp<RpcConnection> connection = assignServerToThisThread(std::move(fd));

    status_t status =
            mState->readConnectionInit(connection->fd, sp<RpcSession>::fromExisting(this));
    status_t status = mState->readConnectionInit(connection, sp<RpcSession>::fromExisting(this));

    return PreJoinSetupResult{
            .connection = std::move(connection),
@@ -279,7 +278,7 @@ void RpcSession::join(sp<RpcSession>&& session, PreJoinSetupResult&& setupResult

    if (setupResult.status == OK) {
        while (true) {
            status_t status = session->state()->getAndExecuteCommand(connection->fd, session,
            status_t status = session->state()->getAndExecuteCommand(connection, session,
                                                                     RpcState::CommandType::ANY);
            if (status != OK) {
                LOG_RPC_DETAIL("Binder connection thread closing w/ status %s",
@@ -454,8 +453,7 @@ bool RpcSession::addClientConnection(unique_fd fd) {
        mClientConnections.push_back(connection);
    }

    status_t status =
            mState->sendConnectionInit(connection->fd, sp<RpcSession>::fromExisting(this));
    status_t status = mState->sendConnectionInit(connection, sp<RpcSession>::fromExisting(this));

    {
        std::lock_guard<std::mutex> _l(mMutex);
+85 −69
Original line number Diff line number Diff line
@@ -222,9 +222,11 @@ RpcState::CommandData::CommandData(size_t size) : mSize(size) {
    mData.reset(new (std::nothrow) uint8_t[size]);
}

status_t RpcState::rpcSend(const base::unique_fd& fd, const sp<RpcSession>& session,
                           const char* what, const void* data, size_t size) {
    LOG_RPC_DETAIL("Sending %s on fd %d: %s", what, fd.get(), hexString(data, size).c_str());
status_t RpcState::rpcSend(const sp<RpcSession::RpcConnection>& connection,
                           const sp<RpcSession>& session, const char* what, const void* data,
                           size_t size) {
    LOG_RPC_DETAIL("Sending %s on fd %d: %s", what, connection->fd.get(),
                   hexString(data, size).c_str());

    if (size > std::numeric_limits<ssize_t>::max()) {
        ALOGE("Cannot send %s at size %zu (too big)", what, size);
@@ -232,12 +234,12 @@ status_t RpcState::rpcSend(const base::unique_fd& fd, const sp<RpcSession>& sess
        return BAD_VALUE;
    }

    ssize_t sent = TEMP_FAILURE_RETRY(send(fd.get(), data, size, MSG_NOSIGNAL));
    ssize_t sent = TEMP_FAILURE_RETRY(send(connection->fd.get(), data, size, MSG_NOSIGNAL));

    if (sent < 0 || sent != static_cast<ssize_t>(size)) {
        int savedErrno = errno;
        LOG_RPC_DETAIL("Failed to send %s (sent %zd of %zu bytes) on fd %d, error: %s", what, sent,
                       size, fd.get(), strerror(savedErrno));
                       size, connection->fd.get(), strerror(savedErrno));

        (void)session->shutdownAndWait(false);
        return -savedErrno;
@@ -246,35 +248,41 @@ status_t RpcState::rpcSend(const base::unique_fd& fd, const sp<RpcSession>& sess
    return OK;
}

status_t RpcState::rpcRec(const base::unique_fd& fd, const sp<RpcSession>& session,
                          const char* what, void* data, size_t size) {
status_t RpcState::rpcRec(const sp<RpcSession::RpcConnection>& connection,
                          const sp<RpcSession>& session, const char* what, void* data,
                          size_t size) {
    if (size > std::numeric_limits<ssize_t>::max()) {
        ALOGE("Cannot rec %s at size %zu (too big)", what, size);
        (void)session->shutdownAndWait(false);
        return BAD_VALUE;
    }

    if (status_t status = session->mShutdownTrigger->interruptableReadFully(fd.get(), data, size);
    if (status_t status =
                session->mShutdownTrigger->interruptableReadFully(connection->fd.get(), data, size);
        status != OK) {
        LOG_RPC_DETAIL("Failed to read %s (%zu bytes) on fd %d, error: %s", what, size, fd.get(),
                       statusToString(status).c_str());
        LOG_RPC_DETAIL("Failed to read %s (%zu bytes) on fd %d, error: %s", what, size,
                       connection->fd.get(), statusToString(status).c_str());
        return status;
    }

    LOG_RPC_DETAIL("Received %s on fd %d: %s", what, fd.get(), hexString(data, size).c_str());
    LOG_RPC_DETAIL("Received %s on fd %d: %s", what, connection->fd.get(),
                   hexString(data, size).c_str());
    return OK;
}

status_t RpcState::sendConnectionInit(const base::unique_fd& fd, const sp<RpcSession>& session) {
status_t RpcState::sendConnectionInit(const sp<RpcSession::RpcConnection>& connection,
                                      const sp<RpcSession>& session) {
    RpcClientConnectionInit init{
            .msg = RPC_CONNECTION_INIT_OKAY,
    };
    return rpcSend(fd, session, "connection init", &init, sizeof(init));
    return rpcSend(connection, session, "connection init", &init, sizeof(init));
}

status_t RpcState::readConnectionInit(const base::unique_fd& fd, const sp<RpcSession>& session) {
status_t RpcState::readConnectionInit(const sp<RpcSession::RpcConnection>& connection,
                                      const sp<RpcSession>& session) {
    RpcClientConnectionInit init;
    if (status_t status = rpcRec(fd, session, "connection init", &init, sizeof(init)); status != OK)
    if (status_t status = rpcRec(connection, session, "connection init", &init, sizeof(init));
        status != OK)
        return status;

    static_assert(sizeof(init.msg) == sizeof(RPC_CONNECTION_INIT_OKAY));
@@ -286,13 +294,14 @@ status_t RpcState::readConnectionInit(const base::unique_fd& fd, const sp<RpcSes
    return OK;
}

sp<IBinder> RpcState::getRootObject(const base::unique_fd& fd, const sp<RpcSession>& session) {
sp<IBinder> RpcState::getRootObject(const sp<RpcSession::RpcConnection>& connection,
                                    const sp<RpcSession>& session) {
    Parcel data;
    data.markForRpc(session);
    Parcel reply;

    status_t status = transactAddress(fd, RpcAddress::zero(), RPC_SPECIAL_TRANSACT_GET_ROOT, data,
                                      session, &reply, 0);
    status_t status = transactAddress(connection, RpcAddress::zero(), RPC_SPECIAL_TRANSACT_GET_ROOT,
                                      data, session, &reply, 0);
    if (status != OK) {
        ALOGE("Error getting root object: %s", statusToString(status).c_str());
        return nullptr;
@@ -301,13 +310,14 @@ sp<IBinder> RpcState::getRootObject(const base::unique_fd& fd, const sp<RpcSessi
    return reply.readStrongBinder();
}

status_t RpcState::getMaxThreads(const base::unique_fd& fd, const sp<RpcSession>& session,
                                 size_t* maxThreadsOut) {
status_t RpcState::getMaxThreads(const sp<RpcSession::RpcConnection>& connection,
                                 const sp<RpcSession>& session, size_t* maxThreadsOut) {
    Parcel data;
    data.markForRpc(session);
    Parcel reply;

    status_t status = transactAddress(fd, RpcAddress::zero(), RPC_SPECIAL_TRANSACT_GET_MAX_THREADS,
    status_t status =
            transactAddress(connection, RpcAddress::zero(), RPC_SPECIAL_TRANSACT_GET_MAX_THREADS,
                            data, session, &reply, 0);
    if (status != OK) {
        ALOGE("Error getting max threads: %s", statusToString(status).c_str());
@@ -326,13 +336,14 @@ status_t RpcState::getMaxThreads(const base::unique_fd& fd, const sp<RpcSession>
    return OK;
}

status_t RpcState::getSessionId(const base::unique_fd& fd, const sp<RpcSession>& session,
                                int32_t* sessionIdOut) {
status_t RpcState::getSessionId(const sp<RpcSession::RpcConnection>& connection,
                                const sp<RpcSession>& session, int32_t* sessionIdOut) {
    Parcel data;
    data.markForRpc(session);
    Parcel reply;

    status_t status = transactAddress(fd, RpcAddress::zero(), RPC_SPECIAL_TRANSACT_GET_SESSION_ID,
    status_t status =
            transactAddress(connection, RpcAddress::zero(), RPC_SPECIAL_TRANSACT_GET_SESSION_ID,
                            data, session, &reply, 0);
    if (status != OK) {
        ALOGE("Error getting session ID: %s", statusToString(status).c_str());
@@ -347,9 +358,9 @@ status_t RpcState::getSessionId(const base::unique_fd& fd, const sp<RpcSession>&
    return OK;
}

status_t RpcState::transact(const base::unique_fd& fd, const sp<IBinder>& binder, uint32_t code,
                            const Parcel& data, const sp<RpcSession>& session, Parcel* reply,
                            uint32_t flags) {
status_t RpcState::transact(const sp<RpcSession::RpcConnection>& connection,
                            const sp<IBinder>& binder, uint32_t code, const Parcel& data,
                            const sp<RpcSession>& session, Parcel* reply, uint32_t flags) {
    if (!data.isForRpc()) {
        ALOGE("Refusing to send RPC with parcel not crafted for RPC");
        return BAD_TYPE;
@@ -363,12 +374,12 @@ status_t RpcState::transact(const base::unique_fd& fd, const sp<IBinder>& binder
    RpcAddress address = RpcAddress::zero();
    if (status_t status = onBinderLeaving(session, binder, &address); status != OK) return status;

    return transactAddress(fd, address, code, data, session, reply, flags);
    return transactAddress(connection, address, code, data, session, reply, flags);
}

status_t RpcState::transactAddress(const base::unique_fd& fd, const RpcAddress& address,
                                   uint32_t code, const Parcel& data, const sp<RpcSession>& session,
                                   Parcel* reply, uint32_t flags) {
status_t RpcState::transactAddress(const sp<RpcSession::RpcConnection>& connection,
                                   const RpcAddress& address, uint32_t code, const Parcel& data,
                                   const sp<RpcSession>& session, Parcel* reply, uint32_t flags) {
    LOG_ALWAYS_FATAL_IF(!data.isForRpc());
    LOG_ALWAYS_FATAL_IF(data.objectsCount() != 0);

@@ -418,25 +429,25 @@ status_t RpcState::transactAddress(const base::unique_fd& fd, const RpcAddress&
    memcpy(transactionData.data() + sizeof(RpcWireHeader) + sizeof(RpcWireTransaction), data.data(),
           data.dataSize());

    if (status_t status =
                rpcSend(fd, session, "transaction", transactionData.data(), transactionData.size());
    if (status_t status = rpcSend(connection, session, "transaction", transactionData.data(),
                                  transactionData.size());
        status != OK)
        // TODO(b/167966510): need to undo onBinderLeaving - we know the
        // refcount isn't successfully transferred.
        return status;

    if (flags & IBinder::FLAG_ONEWAY) {
        LOG_RPC_DETAIL("Oneway command, so no longer waiting on %d", fd.get());
        LOG_RPC_DETAIL("Oneway command, so no longer waiting on %d", connection->fd.get());

        // Do not wait on result.
        // However, too many oneway calls may cause refcounts to build up and fill up the socket,
        // so process those.
        return drainCommands(fd, session, CommandType::CONTROL_ONLY);
        return drainCommands(connection, session, CommandType::CONTROL_ONLY);
    }

    LOG_ALWAYS_FATAL_IF(reply == nullptr, "Reply parcel must be used for synchronous transaction.");

    return waitForReply(fd, session, reply);
    return waitForReply(connection, session, reply);
}

static void cleanup_reply_data(Parcel* p, const uint8_t* data, size_t dataSize,
@@ -448,17 +459,18 @@ static void cleanup_reply_data(Parcel* p, const uint8_t* data, size_t dataSize,
    LOG_ALWAYS_FATAL_IF(objectsCount, 0);
}

status_t RpcState::waitForReply(const base::unique_fd& fd, const sp<RpcSession>& session,
                                Parcel* reply) {
status_t RpcState::waitForReply(const sp<RpcSession::RpcConnection>& connection,
                                const sp<RpcSession>& session, Parcel* reply) {
    RpcWireHeader command;
    while (true) {
        if (status_t status = rpcRec(fd, session, "command header", &command, sizeof(command));
        if (status_t status =
                    rpcRec(connection, session, "command header", &command, sizeof(command));
            status != OK)
            return status;

        if (command.command == RPC_COMMAND_REPLY) break;

        if (status_t status = processServerCommand(fd, session, command, CommandType::ANY);
        if (status_t status = processServerCommand(connection, session, command, CommandType::ANY);
            status != OK)
            return status;
    }
@@ -466,7 +478,7 @@ status_t RpcState::waitForReply(const base::unique_fd& fd, const sp<RpcSession>&
    CommandData data(command.bodySize);
    if (!data.valid()) return NO_MEMORY;

    if (status_t status = rpcRec(fd, session, "reply body", data.data(), command.bodySize);
    if (status_t status = rpcRec(connection, session, "reply body", data.data(), command.bodySize);
        status != OK)
        return status;

@@ -488,8 +500,8 @@ status_t RpcState::waitForReply(const base::unique_fd& fd, const sp<RpcSession>&
    return OK;
}

status_t RpcState::sendDecStrong(const base::unique_fd& fd, const sp<RpcSession>& session,
                                 const RpcAddress& addr) {
status_t RpcState::sendDecStrong(const sp<RpcSession::RpcConnection>& connection,
                                 const sp<RpcSession>& session, const RpcAddress& addr) {
    {
        std::lock_guard<std::mutex> _l(mNodeMutex);
        if (mTerminated) return DEAD_OBJECT; // avoid fatal only, otherwise races
@@ -508,39 +520,42 @@ status_t RpcState::sendDecStrong(const base::unique_fd& fd, const sp<RpcSession>
            .command = RPC_COMMAND_DEC_STRONG,
            .bodySize = sizeof(RpcWireAddress),
    };
    if (status_t status = rpcSend(fd, session, "dec ref header", &cmd, sizeof(cmd)); status != OK)
    if (status_t status = rpcSend(connection, session, "dec ref header", &cmd, sizeof(cmd));
        status != OK)
        return status;
    if (status_t status = rpcSend(fd, session, "dec ref body", &addr.viewRawEmbedded(),
    if (status_t status = rpcSend(connection, session, "dec ref body", &addr.viewRawEmbedded(),
                                  sizeof(RpcWireAddress));
        status != OK)
        return status;
    return OK;
}

status_t RpcState::getAndExecuteCommand(const base::unique_fd& fd, const sp<RpcSession>& session,
                                        CommandType type) {
    LOG_RPC_DETAIL("getAndExecuteCommand on fd %d", fd.get());
status_t RpcState::getAndExecuteCommand(const sp<RpcSession::RpcConnection>& connection,
                                        const sp<RpcSession>& session, CommandType type) {
    LOG_RPC_DETAIL("getAndExecuteCommand on fd %d", connection->fd.get());

    RpcWireHeader command;
    if (status_t status = rpcRec(fd, session, "command header", &command, sizeof(command));
    if (status_t status = rpcRec(connection, session, "command header", &command, sizeof(command));
        status != OK)
        return status;

    return processServerCommand(fd, session, command, type);
    return processServerCommand(connection, session, command, type);
}

status_t RpcState::drainCommands(const base::unique_fd& fd, const sp<RpcSession>& session,
                                 CommandType type) {
status_t RpcState::drainCommands(const sp<RpcSession::RpcConnection>& connection,
                                 const sp<RpcSession>& session, CommandType type) {
    uint8_t buf;
    while (0 < TEMP_FAILURE_RETRY(recv(fd.get(), &buf, sizeof(buf), MSG_PEEK | MSG_DONTWAIT))) {
        status_t status = getAndExecuteCommand(fd, session, type);
    while (0 < TEMP_FAILURE_RETRY(
                       recv(connection->fd.get(), &buf, sizeof(buf), MSG_PEEK | MSG_DONTWAIT))) {
        status_t status = getAndExecuteCommand(connection, session, type);
        if (status != OK) return status;
    }
    return OK;
}

status_t RpcState::processServerCommand(const base::unique_fd& fd, const sp<RpcSession>& session,
                                        const RpcWireHeader& command, CommandType type) {
status_t RpcState::processServerCommand(const sp<RpcSession::RpcConnection>& connection,
                                        const sp<RpcSession>& session, const RpcWireHeader& command,
                                        CommandType type) {
    IPCThreadState* kernelBinderState = IPCThreadState::selfOrNull();
    IPCThreadState::SpGuard spGuard{
            .address = __builtin_frame_address(0),
@@ -559,9 +574,9 @@ status_t RpcState::processServerCommand(const base::unique_fd& fd, const sp<RpcS
    switch (command.command) {
        case RPC_COMMAND_TRANSACT:
            if (type != CommandType::ANY) return BAD_TYPE;
            return processTransact(fd, session, command);
            return processTransact(connection, session, command);
        case RPC_COMMAND_DEC_STRONG:
            return processDecStrong(fd, session, command);
            return processDecStrong(connection, session, command);
    }

    // We should always know the version of the opposing side, and since the
@@ -573,20 +588,20 @@ status_t RpcState::processServerCommand(const base::unique_fd& fd, const sp<RpcS
    (void)session->shutdownAndWait(false);
    return DEAD_OBJECT;
}
status_t RpcState::processTransact(const base::unique_fd& fd, const sp<RpcSession>& session,
                                   const RpcWireHeader& command) {
status_t RpcState::processTransact(const sp<RpcSession::RpcConnection>& connection,
                                   const sp<RpcSession>& session, const RpcWireHeader& command) {
    LOG_ALWAYS_FATAL_IF(command.command != RPC_COMMAND_TRANSACT, "command: %d", command.command);

    CommandData transactionData(command.bodySize);
    if (!transactionData.valid()) {
        return NO_MEMORY;
    }
    if (status_t status = rpcRec(fd, session, "transaction body", transactionData.data(),
    if (status_t status = rpcRec(connection, session, "transaction body", transactionData.data(),
                                 transactionData.size());
        status != OK)
        return status;

    return processTransactInternal(fd, session, std::move(transactionData));
    return processTransactInternal(connection, session, std::move(transactionData));
}

static void do_nothing_to_transact_data(Parcel* p, const uint8_t* data, size_t dataSize,
@@ -598,7 +613,8 @@ static void do_nothing_to_transact_data(Parcel* p, const uint8_t* data, size_t d
    (void)objectsCount;
}

status_t RpcState::processTransactInternal(const base::unique_fd& fd, const sp<RpcSession>& session,
status_t RpcState::processTransactInternal(const sp<RpcSession::RpcConnection>& connection,
                                           const sp<RpcSession>& session,
                                           CommandData transactionData) {
    // for 'recursive' calls to this, we have already read and processed the
    // binder from the transaction data and taken reference counts into account,
@@ -811,11 +827,11 @@ processTransactInternalTailCall:
    memcpy(replyData.data() + sizeof(RpcWireHeader) + sizeof(RpcWireReply), reply.data(),
           reply.dataSize());

    return rpcSend(fd, session, "reply", replyData.data(), replyData.size());
    return rpcSend(connection, session, "reply", replyData.data(), replyData.size());
}

status_t RpcState::processDecStrong(const base::unique_fd& fd, const sp<RpcSession>& session,
                                    const RpcWireHeader& command) {
status_t RpcState::processDecStrong(const sp<RpcSession::RpcConnection>& connection,
                                    const sp<RpcSession>& session, const RpcWireHeader& command) {
    LOG_ALWAYS_FATAL_IF(command.command != RPC_COMMAND_DEC_STRONG, "command: %d", command.command);

    CommandData commandData(command.bodySize);
@@ -823,7 +839,7 @@ status_t RpcState::processDecStrong(const base::unique_fd& fd, const sp<RpcSessi
        return NO_MEMORY;
    }
    if (status_t status =
                rpcRec(fd, session, "dec ref body", commandData.data(), commandData.size());
                rpcRec(connection, session, "dec ref body", commandData.data(), commandData.size());
        status != OK)
        return status;

+35 −29

File changed.

Preview size limit exceeded, changes collapsed.

+1 −1
Original line number Diff line number Diff line
@@ -237,7 +237,7 @@ private:
                             ExclusiveConnection* connection);

        ~ExclusiveConnection();
        const base::unique_fd& fd() { return mConnection->fd; }
        const sp<RpcConnection>& get() { return mConnection; }

    private:
        static void findConnection(pid_t tid, sp<RpcConnection>* exclusive,