Donate to e Foundation | Murena handsets with /e/OS | Own a part of Murena! Learn more

Commit 2c25d940 authored by Treehugger Robot's avatar Treehugger Robot Committed by Automerger Merge Worker
Browse files

Merge "binder: Eliminate a data copy in RPC transport operations" am:...

Merge "binder: Eliminate a data copy in RPC transport operations" am: 7dc506f2 am: 8e5a9c89 am: e627a40c

Original change: https://android-review.googlesource.com/c/platform/frameworks/native/+/1917958

Change-Id: Idd582e849674945da732d57b004989fade91cddb
parents 2692efa2 e627a40c
Loading
Loading
Loading
Loading
+7 −6
Original line number Diff line number Diff line
@@ -287,8 +287,8 @@ void RpcServer::establishConnection(sp<RpcServer>&& server, base::unique_fd clie

    RpcConnectionHeader header;
    if (status == OK) {
        status = client->interruptableReadFully(server->mShutdownTrigger.get(), &header,
                                                sizeof(header), {});
        iovec iov{&header, sizeof(header)};
        status = client->interruptableReadFully(server->mShutdownTrigger.get(), &iov, 1, {});
        if (status != OK) {
            ALOGE("Failed to read ID for client connecting to RPC server: %s",
                  statusToString(status).c_str());
@@ -301,8 +301,9 @@ void RpcServer::establishConnection(sp<RpcServer>&& server, base::unique_fd clie
        if (header.sessionIdSize > 0) {
            if (header.sessionIdSize == kSessionIdBytes) {
                sessionId.resize(header.sessionIdSize);
                status = client->interruptableReadFully(server->mShutdownTrigger.get(),
                                                        sessionId.data(), sessionId.size(), {});
                iovec iov{sessionId.data(), sessionId.size()};
                status =
                        client->interruptableReadFully(server->mShutdownTrigger.get(), &iov, 1, {});
                if (status != OK) {
                    ALOGE("Failed to read session ID for client connecting to RPC server: %s",
                          statusToString(status).c_str());
@@ -331,8 +332,8 @@ void RpcServer::establishConnection(sp<RpcServer>&& server, base::unique_fd clie
                    .version = protocolVersion,
            };

            status = client->interruptableWriteFully(server->mShutdownTrigger.get(), &response,
                                                     sizeof(response), {});
            iovec iov{&response, sizeof(response)};
            status = client->interruptableWriteFully(server->mShutdownTrigger.get(), &iov, 1, {});
            if (status != OK) {
                ALOGE("Failed to send new session response: %s", statusToString(status).c_str());
                // still need to cleanup before we can return
+5 −3
Original line number Diff line number Diff line
@@ -615,8 +615,9 @@ status_t RpcSession::initAndAddConnection(unique_fd fd, const std::vector<uint8_
        header.options |= RPC_CONNECTION_OPTION_INCOMING;
    }

    iovec headerIov{&header, sizeof(header)};
    auto sendHeaderStatus =
            server->interruptableWriteFully(mShutdownTrigger.get(), &header, sizeof(header), {});
            server->interruptableWriteFully(mShutdownTrigger.get(), &headerIov, 1, {});
    if (sendHeaderStatus != OK) {
        ALOGE("Could not write connection header to socket: %s",
              statusToString(sendHeaderStatus).c_str());
@@ -624,9 +625,10 @@ status_t RpcSession::initAndAddConnection(unique_fd fd, const std::vector<uint8_
    }

    if (sessionId.size() > 0) {
        iovec sessionIov{const_cast<void*>(static_cast<const void*>(sessionId.data())),
                         sessionId.size()};
        auto sendSessionIdStatus =
                server->interruptableWriteFully(mShutdownTrigger.get(), sessionId.data(),
                                                sessionId.size(), {});
                server->interruptableWriteFully(mShutdownTrigger.get(), &sessionIov, 1, {});
        if (sendSessionIdStatus != OK) {
            ALOGE("Could not write session ID ('%s') to socket: %s",
                  base::HexString(sessionId.data(), sessionId.size()).c_str(),
+47 −68
Original line number Diff line number Diff line
@@ -19,6 +19,7 @@
#include "RpcState.h"

#include <android-base/hex.h>
#include <android-base/macros.h>
#include <android-base/scopeguard.h>
#include <binder/BpBinder.h>
#include <binder/IPCThreadState.h>
@@ -309,22 +310,18 @@ RpcState::CommandData::CommandData(size_t size) : mSize(size) {
}

status_t RpcState::rpcSend(const sp<RpcSession::RpcConnection>& connection,
                           const sp<RpcSession>& session, const char* what, const void* data,
                           size_t size, const std::function<status_t()>& altPoll) {
                           const sp<RpcSession>& session, const char* what, iovec* iovs,
                           size_t niovs, const std::function<status_t()>& altPoll) {
    for (size_t i = 0; i < niovs; i++) {
        LOG_RPC_DETAIL("Sending %s on RpcTransport %p: %s", what, connection->rpcTransport.get(),
                   android::base::HexString(data, size).c_str());

    if (size > std::numeric_limits<ssize_t>::max()) {
        ALOGE("Cannot send %s at size %zu (too big)", what, size);
        (void)session->shutdownAndWait(false);
        return BAD_VALUE;
                       android::base::HexString(iovs[i].iov_base, iovs[i].iov_len).c_str());
    }

    if (status_t status =
                connection->rpcTransport->interruptableWriteFully(session->mShutdownTrigger.get(),
                                                                  data, size, altPoll);
                                                                  iovs, niovs, altPoll);
        status != OK) {
        LOG_RPC_DETAIL("Failed to write %s (%zu bytes) on RpcTransport %p, error: %s", what, size,
        LOG_RPC_DETAIL("Failed to write %s (%zu iovs) on RpcTransport %p, error: %s", what, niovs,
                       connection->rpcTransport.get(), statusToString(status).c_str());
        (void)session->shutdownAndWait(false);
        return status;
@@ -334,34 +331,30 @@ status_t RpcState::rpcSend(const sp<RpcSession::RpcConnection>& connection,
}

status_t RpcState::rpcRec(const sp<RpcSession::RpcConnection>& connection,
                          const sp<RpcSession>& session, const char* what, void* data,
                          size_t size) {
    if (size > std::numeric_limits<ssize_t>::max()) {
        ALOGE("Cannot rec %s at size %zu (too big)", what, size);
        (void)session->shutdownAndWait(false);
        return BAD_VALUE;
    }

                          const sp<RpcSession>& session, const char* what, iovec* iovs,
                          size_t niovs) {
    if (status_t status =
                connection->rpcTransport->interruptableReadFully(session->mShutdownTrigger.get(),
                                                                 data, size, {});
                                                                 iovs, niovs, {});
        status != OK) {
        LOG_RPC_DETAIL("Failed to read %s (%zu bytes) on RpcTransport %p, error: %s", what, size,
        LOG_RPC_DETAIL("Failed to read %s (%zu iovs) on RpcTransport %p, error: %s", what, niovs,
                       connection->rpcTransport.get(), statusToString(status).c_str());
        (void)session->shutdownAndWait(false);
        return status;
    }

    for (size_t i = 0; i < niovs; i++) {
        LOG_RPC_DETAIL("Received %s on RpcTransport %p: %s", what, connection->rpcTransport.get(),
                   android::base::HexString(data, size).c_str());
                       android::base::HexString(iovs[i].iov_base, iovs[i].iov_len).c_str());
    }
    return OK;
}

status_t RpcState::readNewSessionResponse(const sp<RpcSession::RpcConnection>& connection,
                                          const sp<RpcSession>& session, uint32_t* version) {
    RpcNewSessionResponse response;
    if (status_t status =
                rpcRec(connection, session, "new session response", &response, sizeof(response));
    iovec iov{&response, sizeof(response)};
    if (status_t status = rpcRec(connection, session, "new session response", &iov, 1);
        status != OK) {
        return status;
    }
@@ -374,14 +367,15 @@ status_t RpcState::sendConnectionInit(const sp<RpcSession::RpcConnection>& conne
    RpcOutgoingConnectionInit init{
            .msg = RPC_CONNECTION_INIT_OKAY,
    };
    return rpcSend(connection, session, "connection init", &init, sizeof(init));
    iovec iov{&init, sizeof(init)};
    return rpcSend(connection, session, "connection init", &iov, 1);
}

status_t RpcState::readConnectionInit(const sp<RpcSession::RpcConnection>& connection,
                                      const sp<RpcSession>& session) {
    RpcOutgoingConnectionInit init;
    if (status_t status = rpcRec(connection, session, "connection init", &init, sizeof(init));
        status != OK)
    iovec iov{&init, sizeof(init)};
    if (status_t status = rpcRec(connection, session, "connection init", &iov, 1); status != OK)
        return status;

    static_assert(sizeof(init.msg) == sizeof(RPC_CONNECTION_INIT_OKAY));
@@ -514,17 +508,6 @@ status_t RpcState::transactAddress(const sp<RpcSession::RpcConnection>& connecti
            .flags = flags,
            .asyncNumber = asyncNumber,
    };
    CommandData transactionData(sizeof(RpcWireHeader) + sizeof(RpcWireTransaction) +
                                data.dataSize());
    if (!transactionData.valid()) {
        return NO_MEMORY;
    }

    memcpy(transactionData.data() + 0, &command, sizeof(RpcWireHeader));
    memcpy(transactionData.data() + sizeof(RpcWireHeader), &transaction,
           sizeof(RpcWireTransaction));
    memcpy(transactionData.data() + sizeof(RpcWireHeader) + sizeof(RpcWireTransaction), data.data(),
           data.dataSize());

    constexpr size_t kWaitMaxUs = 1000000;
    constexpr size_t kWaitLogUs = 10000;
@@ -550,8 +533,13 @@ status_t RpcState::transactAddress(const sp<RpcSession::RpcConnection>& connecti
        return drainCommands(connection, session, CommandType::CONTROL_ONLY);
    };

    if (status_t status = rpcSend(connection, session, "transaction", transactionData.data(),
                                  transactionData.size(), drainRefs);
    iovec iovs[]{
            {&command, sizeof(RpcWireHeader)},
            {&transaction, sizeof(RpcWireTransaction)},
            {const_cast<uint8_t*>(data.data()), data.dataSize()},
    };
    if (status_t status =
                rpcSend(connection, session, "transaction", iovs, arraysize(iovs), drainRefs);
        status != OK) {
        // TODO(b/167966510): need to undo onBinderLeaving - we know the
        // refcount isn't successfully transferred.
@@ -584,8 +572,8 @@ status_t RpcState::waitForReply(const sp<RpcSession::RpcConnection>& connection,
                                const sp<RpcSession>& session, Parcel* reply) {
    RpcWireHeader command;
    while (true) {
        if (status_t status = rpcRec(connection, session, "command header (for reply)", &command,
                                     sizeof(command));
        iovec iov{&command, sizeof(command)};
        if (status_t status = rpcRec(connection, session, "command header (for reply)", &iov, 1);
            status != OK)
            return status;

@@ -599,8 +587,8 @@ status_t RpcState::waitForReply(const sp<RpcSession::RpcConnection>& connection,
    CommandData data(command.bodySize);
    if (!data.valid()) return NO_MEMORY;

    if (status_t status = rpcRec(connection, session, "reply body", data.data(), command.bodySize);
        status != OK)
    iovec iov{data.data(), command.bodySize};
    if (status_t status = rpcRec(connection, session, "reply body", &iov, 1); status != OK)
        return status;

    if (command.bodySize < sizeof(RpcWireReply)) {
@@ -653,11 +641,8 @@ status_t RpcState::sendDecStrongToTarget(const sp<RpcSession::RpcConnection>& co
            .command = RPC_COMMAND_DEC_STRONG,
            .bodySize = sizeof(RpcDecStrong),
    };
    if (status_t status = rpcSend(connection, session, "dec ref header", &cmd, sizeof(cmd));
        status != OK)
        return status;

    return rpcSend(connection, session, "dec ref body", &body, sizeof(body));
    iovec iovs[]{{&cmd, sizeof(cmd)}, {&body, sizeof(body)}};
    return rpcSend(connection, session, "dec ref", iovs, arraysize(iovs));
}

status_t RpcState::getAndExecuteCommand(const sp<RpcSession::RpcConnection>& connection,
@@ -665,8 +650,8 @@ status_t RpcState::getAndExecuteCommand(const sp<RpcSession::RpcConnection>& con
    LOG_RPC_DETAIL("getAndExecuteCommand on RpcTransport %p", connection->rpcTransport.get());

    RpcWireHeader command;
    if (status_t status = rpcRec(connection, session, "command header (for server)", &command,
                                 sizeof(command));
    iovec iov{&command, sizeof(command)};
    if (status_t status = rpcRec(connection, session, "command header (for server)", &iov, 1);
        status != OK)
        return status;

@@ -726,9 +711,8 @@ status_t RpcState::processTransact(const sp<RpcSession::RpcConnection>& connecti
    if (!transactionData.valid()) {
        return NO_MEMORY;
    }
    if (status_t status = rpcRec(connection, session, "transaction body", transactionData.data(),
                                 transactionData.size());
        status != OK)
    iovec iov{transactionData.data(), transactionData.size()};
    if (status_t status = rpcRec(connection, session, "transaction body", &iov, 1); status != OK)
        return status;

    return processTransactInternal(connection, session, std::move(transactionData));
@@ -965,16 +949,12 @@ processTransactInternalTailCall:
            .status = replyStatus,
    };

    CommandData replyData(sizeof(RpcWireHeader) + sizeof(RpcWireReply) + reply.dataSize());
    if (!replyData.valid()) {
        return NO_MEMORY;
    }
    memcpy(replyData.data() + 0, &cmdReply, sizeof(RpcWireHeader));
    memcpy(replyData.data() + sizeof(RpcWireHeader), &rpcReply, sizeof(RpcWireReply));
    memcpy(replyData.data() + sizeof(RpcWireHeader) + sizeof(RpcWireReply), reply.data(),
           reply.dataSize());

    return rpcSend(connection, session, "reply", replyData.data(), replyData.size());
    iovec iovs[]{
            {&cmdReply, sizeof(RpcWireHeader)},
            {&rpcReply, sizeof(RpcWireReply)},
            {const_cast<uint8_t*>(reply.data()), reply.dataSize()},
    };
    return rpcSend(connection, session, "reply", iovs, arraysize(iovs));
}

status_t RpcState::processDecStrong(const sp<RpcSession::RpcConnection>& connection,
@@ -985,9 +965,8 @@ status_t RpcState::processDecStrong(const sp<RpcSession::RpcConnection>& connect
    if (!commandData.valid()) {
        return NO_MEMORY;
    }
    if (status_t status =
                rpcRec(connection, session, "dec ref body", commandData.data(), commandData.size());
        status != OK)
    iovec iov{commandData.data(), commandData.size()};
    if (status_t status = rpcRec(connection, session, "dec ref body", &iov, 1); status != OK)
        return status;

    if (command.bodySize != sizeof(RpcDecStrong)) {
+6 −4
Original line number Diff line number Diff line
@@ -24,6 +24,8 @@
#include <optional>
#include <queue>

#include <sys/uio.h>

namespace android {

struct RpcWireHeader;
@@ -177,12 +179,12 @@ private:
    };

    [[nodiscard]] status_t rpcSend(const sp<RpcSession::RpcConnection>& connection,
                                   const sp<RpcSession>& session, const char* what,
                                   const void* data, size_t size,
                                   const sp<RpcSession>& session, const char* what, iovec* iovs,
                                   size_t niovs,
                                   const std::function<status_t()>& altPoll = nullptr);
    [[nodiscard]] status_t rpcRec(const sp<RpcSession::RpcConnection>& connection,
                                  const sp<RpcSession>& session, const char* what, void* data,
                                  size_t size);
                                  const sp<RpcSession>& session, const char* what, iovec* iovs,
                                  size_t niovs);

    [[nodiscard]] status_t waitForReply(const sp<RpcSession::RpcConnection>& connection,
                                        const sp<RpcSession>& session, Parcel* reply);
+49 −16
Original line number Diff line number Diff line
@@ -43,12 +43,10 @@ public:
        return ret;
    }

    template <typename Buffer, typename SendOrReceive>
    status_t interruptableReadOrWrite(FdTrigger* fdTrigger, Buffer buffer, size_t size,
    template <typename SendOrReceive>
    status_t interruptableReadOrWrite(FdTrigger* fdTrigger, iovec* iovs, size_t niovs,
                                      SendOrReceive sendOrReceiveFun, const char* funName,
                                      int16_t event, const std::function<status_t()>& altPoll) {
        const Buffer end = buffer + size;

        MAYBE_WAIT_IN_FLAKE_MODE;

        // Since we didn't poll, we need to manually check to see if it was triggered. Otherwise, we
@@ -57,26 +55,61 @@ public:
            return DEAD_OBJECT;
        }

        // If iovs has one or more empty vectors at the end and
        // we somehow advance past all the preceding vectors and
        // pass some or all of the empty ones to sendmsg/recvmsg,
        // the call will return processSize == 0. In that case
        // we should be returning OK but instead return DEAD_OBJECT.
        // To avoid this problem, we make sure here that the last
        // vector at iovs[niovs - 1] has a non-zero length.
        while (niovs > 0 && iovs[niovs - 1].iov_len == 0) {
            niovs--;
        }
        if (niovs == 0) {
            // The vectors are all empty, so we have nothing to send.
            return OK;
        }

        bool havePolled = false;
        while (true) {
            ssize_t processSize = TEMP_FAILURE_RETRY(
                    sendOrReceiveFun(mSocket.get(), buffer, end - buffer, MSG_NOSIGNAL));
            msghdr msg{
                    .msg_iov = iovs,
                    .msg_iovlen = niovs,
            };
            ssize_t processSize =
                    TEMP_FAILURE_RETRY(sendOrReceiveFun(mSocket.get(), &msg, MSG_NOSIGNAL));

            if (processSize < 0) {
                int savedErrno = errno;

                // Still return the error on later passes, since it would expose
                // a problem with polling
                if (havePolled ||
                    (!havePolled && savedErrno != EAGAIN && savedErrno != EWOULDBLOCK)) {
                if (havePolled || (savedErrno != EAGAIN && savedErrno != EWOULDBLOCK)) {
                    LOG_RPC_DETAIL("RpcTransport %s(): %s", funName, strerror(savedErrno));
                    return -savedErrno;
                }
            } else if (processSize == 0) {
                return DEAD_OBJECT;
            } else {
                buffer += processSize;
                if (buffer == end) {
                while (processSize > 0 && niovs > 0) {
                    auto& iov = iovs[0];
                    if (static_cast<size_t>(processSize) < iov.iov_len) {
                        // Advance the base of the current iovec
                        iov.iov_base = reinterpret_cast<char*>(iov.iov_base) + processSize;
                        iov.iov_len -= processSize;
                        break;
                    }

                    // The current iovec was fully written
                    processSize -= iov.iov_len;
                    iovs++;
                    niovs--;
                }
                if (niovs == 0) {
                    LOG_ALWAYS_FATAL_IF(processSize > 0,
                                        "Reached the end of iovecs "
                                        "with %zd bytes remaining",
                                        processSize);
                    return OK;
                }
            }
@@ -95,16 +128,16 @@ public:
        }
    }

    status_t interruptableWriteFully(FdTrigger* fdTrigger, const void* data, size_t size,
    status_t interruptableWriteFully(FdTrigger* fdTrigger, iovec* iovs, size_t niovs,
                                     const std::function<status_t()>& altPoll) override {
        return interruptableReadOrWrite(fdTrigger, reinterpret_cast<const uint8_t*>(data), size,
                                        send, "send", POLLOUT, altPoll);
        return interruptableReadOrWrite(fdTrigger, iovs, niovs, sendmsg, "sendmsg", POLLOUT,
                                        altPoll);
    }

    status_t interruptableReadFully(FdTrigger* fdTrigger, void* data, size_t size,
    status_t interruptableReadFully(FdTrigger* fdTrigger, iovec* iovs, size_t niovs,
                                    const std::function<status_t()>& altPoll) override {
        return interruptableReadOrWrite(fdTrigger, reinterpret_cast<uint8_t*>(data), size, recv,
                                        "recv", POLLIN, altPoll);
        return interruptableReadOrWrite(fdTrigger, iovs, niovs, recvmsg, "recvmsg", POLLIN,
                                        altPoll);
    }

private:
Loading