Donate to e Foundation | Murena handsets with /e/OS | Own a part of Murena! Learn more

Commit 7091cba2 authored by Steven Moreland's avatar Steven Moreland Committed by Automerger Merge Worker
Browse files

Merge changes from topic "libbinder-rpc-perf-poll" am: b37acedb am: 708f9aa0

Original change: https://android-review.googlesource.com/c/platform/frameworks/native/+/1838047

Change-Id: I199efc331d22b94811dd165e3e30b409f53ebbbe
parents fbe52e4c 708f9aa0
Loading
Loading
Loading
Loading
+3 −3
Original line number Original line Diff line number Diff line
@@ -278,7 +278,7 @@ void RpcServer::establishConnection(sp<RpcServer>&& server, base::unique_fd clie
    RpcConnectionHeader header;
    RpcConnectionHeader header;
    if (status == OK) {
    if (status == OK) {
        status = client->interruptableReadFully(server->mShutdownTrigger.get(), &header,
        status = client->interruptableReadFully(server->mShutdownTrigger.get(), &header,
                                                sizeof(header));
                                                sizeof(header), {});
        if (status != OK) {
        if (status != OK) {
            ALOGE("Failed to read ID for client connecting to RPC server: %s",
            ALOGE("Failed to read ID for client connecting to RPC server: %s",
                  statusToString(status).c_str());
                  statusToString(status).c_str());
@@ -291,7 +291,7 @@ void RpcServer::establishConnection(sp<RpcServer>&& server, base::unique_fd clie
        if (header.sessionIdSize > 0) {
        if (header.sessionIdSize > 0) {
            sessionId.resize(header.sessionIdSize);
            sessionId.resize(header.sessionIdSize);
            status = client->interruptableReadFully(server->mShutdownTrigger.get(),
            status = client->interruptableReadFully(server->mShutdownTrigger.get(),
                                                    sessionId.data(), sessionId.size());
                                                    sessionId.data(), sessionId.size(), {});
            if (status != OK) {
            if (status != OK) {
                ALOGE("Failed to read session ID for client connecting to RPC server: %s",
                ALOGE("Failed to read session ID for client connecting to RPC server: %s",
                      statusToString(status).c_str());
                      statusToString(status).c_str());
@@ -316,7 +316,7 @@ void RpcServer::establishConnection(sp<RpcServer>&& server, base::unique_fd clie
            };
            };


            status = client->interruptableWriteFully(server->mShutdownTrigger.get(), &response,
            status = client->interruptableWriteFully(server->mShutdownTrigger.get(), &response,
                                                     sizeof(response));
                                                     sizeof(response), {});
            if (status != OK) {
            if (status != OK) {
                ALOGE("Failed to send new session response: %s", statusToString(status).c_str());
                ALOGE("Failed to send new session response: %s", statusToString(status).c_str());
                // still need to cleanup before we can return
                // still need to cleanup before we can return
+2 −2
Original line number Original line Diff line number Diff line
@@ -560,7 +560,7 @@ status_t RpcSession::initAndAddConnection(unique_fd fd, const std::vector<uint8_
    }
    }


    auto sendHeaderStatus =
    auto sendHeaderStatus =
            server->interruptableWriteFully(mShutdownTrigger.get(), &header, sizeof(header));
            server->interruptableWriteFully(mShutdownTrigger.get(), &header, sizeof(header), {});
    if (sendHeaderStatus != OK) {
    if (sendHeaderStatus != OK) {
        ALOGE("Could not write connection header to socket: %s",
        ALOGE("Could not write connection header to socket: %s",
              statusToString(sendHeaderStatus).c_str());
              statusToString(sendHeaderStatus).c_str());
@@ -570,7 +570,7 @@ status_t RpcSession::initAndAddConnection(unique_fd fd, const std::vector<uint8_
    if (sessionId.size() > 0) {
    if (sessionId.size() > 0) {
        auto sendSessionIdStatus =
        auto sendSessionIdStatus =
                server->interruptableWriteFully(mShutdownTrigger.get(), sessionId.data(),
                server->interruptableWriteFully(mShutdownTrigger.get(), sessionId.data(),
                                                sessionId.size());
                                                sessionId.size(), {});
        if (sendSessionIdStatus != OK) {
        if (sendSessionIdStatus != OK) {
            ALOGE("Could not write session ID ('%s') to socket: %s",
            ALOGE("Could not write session ID ('%s') to socket: %s",
                  base::HexString(sessionId.data(), sessionId.size()).c_str(),
                  base::HexString(sessionId.data(), sessionId.size()).c_str(),
+35 −14
Original line number Original line Diff line number Diff line
@@ -307,7 +307,7 @@ RpcState::CommandData::CommandData(size_t size) : mSize(size) {


status_t RpcState::rpcSend(const sp<RpcSession::RpcConnection>& connection,
status_t RpcState::rpcSend(const sp<RpcSession::RpcConnection>& connection,
                           const sp<RpcSession>& session, const char* what, const void* data,
                           const sp<RpcSession>& session, const char* what, const void* data,
                           size_t size) {
                           size_t size, const std::function<status_t()>& altPoll) {
    LOG_RPC_DETAIL("Sending %s on RpcTransport %p: %s", what, connection->rpcTransport.get(),
    LOG_RPC_DETAIL("Sending %s on RpcTransport %p: %s", what, connection->rpcTransport.get(),
                   android::base::HexString(data, size).c_str());
                   android::base::HexString(data, size).c_str());


@@ -319,7 +319,7 @@ status_t RpcState::rpcSend(const sp<RpcSession::RpcConnection>& connection,


    if (status_t status =
    if (status_t status =
                connection->rpcTransport->interruptableWriteFully(session->mShutdownTrigger.get(),
                connection->rpcTransport->interruptableWriteFully(session->mShutdownTrigger.get(),
                                                                  data, size);
                                                                  data, size, altPoll);
        status != OK) {
        status != OK) {
        LOG_RPC_DETAIL("Failed to write %s (%zu bytes) on RpcTransport %p, error: %s", what, size,
        LOG_RPC_DETAIL("Failed to write %s (%zu bytes) on RpcTransport %p, error: %s", what, size,
                       connection->rpcTransport.get(), statusToString(status).c_str());
                       connection->rpcTransport.get(), statusToString(status).c_str());
@@ -341,7 +341,7 @@ status_t RpcState::rpcRec(const sp<RpcSession::RpcConnection>& connection,


    if (status_t status =
    if (status_t status =
                connection->rpcTransport->interruptableReadFully(session->mShutdownTrigger.get(),
                connection->rpcTransport->interruptableReadFully(session->mShutdownTrigger.get(),
                                                                 data, size);
                                                                 data, size, {});
        status != OK) {
        status != OK) {
        LOG_RPC_DETAIL("Failed to read %s (%zu bytes) on RpcTransport %p, error: %s", what, size,
        LOG_RPC_DETAIL("Failed to read %s (%zu bytes) on RpcTransport %p, error: %s", what, size,
                       connection->rpcTransport.get(), statusToString(status).c_str());
                       connection->rpcTransport.get(), statusToString(status).c_str());
@@ -523,21 +523,44 @@ status_t RpcState::transactAddress(const sp<RpcSession::RpcConnection>& connecti
    memcpy(transactionData.data() + sizeof(RpcWireHeader) + sizeof(RpcWireTransaction), data.data(),
    memcpy(transactionData.data() + sizeof(RpcWireHeader) + sizeof(RpcWireTransaction), data.data(),
           data.dataSize());
           data.dataSize());


    constexpr size_t kWaitMaxUs = 1000000;
    constexpr size_t kWaitLogUs = 10000;
    size_t waitUs = 0;

    // Oneway calls have no sync point, so if many are sent before, whether this
    // is a twoway or oneway transaction, they may have filled up the socket.
    // So, make sure we drain them before polling.
    std::function<status_t()> drainRefs = [&] {
        if (waitUs > kWaitLogUs) {
            ALOGE("Cannot send command, trying to process pending refcounts. Waiting %zuus. Too "
                  "many oneway calls?",
                  waitUs);
        }

        if (waitUs > 0) {
            usleep(waitUs);
            waitUs = std::min(kWaitMaxUs, waitUs * 2);
        } else {
            waitUs = 1;
        }

        return drainCommands(connection, session, CommandType::CONTROL_ONLY);
    };

    if (status_t status = rpcSend(connection, session, "transaction", transactionData.data(),
    if (status_t status = rpcSend(connection, session, "transaction", transactionData.data(),
                                  transactionData.size());
                                  transactionData.size(), drainRefs);
        status != OK)
        status != OK) {
        // TODO(b/167966510): need to undo onBinderLeaving - we know the
        // TODO(b/167966510): need to undo onBinderLeaving - we know the
        // refcount isn't successfully transferred.
        // refcount isn't successfully transferred.
        return status;
        return status;
    }


    if (flags & IBinder::FLAG_ONEWAY) {
    if (flags & IBinder::FLAG_ONEWAY) {
        LOG_RPC_DETAIL("Oneway command, so no longer waiting on RpcTransport %p",
        LOG_RPC_DETAIL("Oneway command, so no longer waiting on RpcTransport %p",
                       connection->rpcTransport.get());
                       connection->rpcTransport.get());


        // Do not wait on result.
        // Do not wait on result.
        // However, too many oneway calls may cause refcounts to build up and fill up the socket,
        return OK;
        // so process those.
        return drainCommands(connection, session, CommandType::CONTROL_ONLY);
    }
    }


    LOG_ALWAYS_FATAL_IF(reply == nullptr, "Reply parcel must be used for synchronous transaction.");
    LOG_ALWAYS_FATAL_IF(reply == nullptr, "Reply parcel must be used for synchronous transaction.");
@@ -723,7 +746,7 @@ status_t RpcState::processTransactInternal(const sp<RpcSession::RpcConnection>&
    // for 'recursive' calls to this, we have already read and processed the
    // for 'recursive' calls to this, we have already read and processed the
    // binder from the transaction data and taken reference counts into account,
    // binder from the transaction data and taken reference counts into account,
    // so it is cached here.
    // so it is cached here.
    sp<IBinder> targetRef;
    sp<IBinder> target;
processTransactInternalTailCall:
processTransactInternalTailCall:


    if (transactionData.size() < sizeof(RpcWireTransaction)) {
    if (transactionData.size() < sizeof(RpcWireTransaction)) {
@@ -738,12 +761,9 @@ processTransactInternalTailCall:
    bool oneway = transaction->flags & IBinder::FLAG_ONEWAY;
    bool oneway = transaction->flags & IBinder::FLAG_ONEWAY;


    status_t replyStatus = OK;
    status_t replyStatus = OK;
    sp<IBinder> target;
    if (addr != 0) {
    if (addr != 0) {
        if (!targetRef) {
        if (!target) {
            replyStatus = onBinderEntering(session, addr, &target);
            replyStatus = onBinderEntering(session, addr, &target);
        } else {
            target = targetRef;
        }
        }


        if (replyStatus != OK) {
        if (replyStatus != OK) {
@@ -910,7 +930,8 @@ processTransactInternalTailCall:


                // reset up arguments
                // reset up arguments
                transactionData = std::move(todo.data);
                transactionData = std::move(todo.data);
                targetRef = std::move(todo.ref);
                LOG_ALWAYS_FATAL_IF(target != todo.ref,
                                    "async list should be associated with a binder");


                it->second.asyncTodo.pop();
                it->second.asyncTodo.pop();
                goto processTransactInternalTailCall;
                goto processTransactInternalTailCall;
+2 −1
Original line number Original line Diff line number Diff line
@@ -177,7 +177,8 @@ private:


    [[nodiscard]] status_t rpcSend(const sp<RpcSession::RpcConnection>& connection,
    [[nodiscard]] status_t rpcSend(const sp<RpcSession::RpcConnection>& connection,
                                   const sp<RpcSession>& session, const char* what,
                                   const sp<RpcSession>& session, const char* what,
                                   const void* data, size_t size);
                                   const void* data, size_t size,
                                   const std::function<status_t()>& altPoll = nullptr);
    [[nodiscard]] status_t rpcRec(const sp<RpcSession::RpcConnection>& connection,
    [[nodiscard]] status_t rpcRec(const sp<RpcSession::RpcConnection>& connection,
                                  const sp<RpcSession>& session, const char* what, void* data,
                                  const sp<RpcSession>& session, const char* what, void* data,
                                  size_t size);
                                  size_t size);
+53 −37
Original line number Original line Diff line number Diff line
@@ -43,56 +43,72 @@ public:
        return ret;
        return ret;
    }
    }


    status_t interruptableWriteFully(FdTrigger* fdTrigger, const void* data, size_t size) override {
    template <typename Buffer, typename SendOrReceive>
        const uint8_t* buffer = reinterpret_cast<const uint8_t*>(data);
    status_t interruptableReadOrWrite(FdTrigger* fdTrigger, Buffer buffer, size_t size,
        const uint8_t* end = buffer + size;
                                      SendOrReceive sendOrReceiveFun, const char* funName,
                                      int16_t event, const std::function<status_t()>& altPoll) {
        const Buffer end = buffer + size;


        MAYBE_WAIT_IN_FLAKE_MODE;
        MAYBE_WAIT_IN_FLAKE_MODE;


        status_t status;
        // Since we didn't poll, we need to manually check to see if it was triggered. Otherwise, we
        while ((status = fdTrigger->triggerablePoll(mSocket.get(), POLLOUT)) == OK) {
        // may never know we should be shutting down.
            ssize_t writeSize =
        if (fdTrigger->isTriggered()) {
                    TEMP_FAILURE_RETRY(::send(mSocket.get(), buffer, end - buffer, MSG_NOSIGNAL));
            return DEAD_OBJECT;
            if (writeSize < 0) {
        }

        bool havePolled = false;
        while (true) {
            ssize_t processSize = TEMP_FAILURE_RETRY(
                    sendOrReceiveFun(mSocket.get(), buffer, end - buffer, MSG_NOSIGNAL));

            if (processSize < 0) {
                int savedErrno = errno;
                int savedErrno = errno;
                LOG_RPC_DETAIL("RpcTransport send(): %s", strerror(savedErrno));

                // Still return the error on later passes, since it would expose
                // a problem with polling
                if (havePolled ||
                    (!havePolled && savedErrno != EAGAIN && savedErrno != EWOULDBLOCK)) {
                    LOG_RPC_DETAIL("RpcTransport %s(): %s", funName, strerror(savedErrno));
                    return -savedErrno;
                    return -savedErrno;
                }
                }
            } else if (processSize == 0) {
                return DEAD_OBJECT;
            } else {
                buffer += processSize;
                if (buffer == end) {
                    return OK;
                }
            }


            if (writeSize == 0) return DEAD_OBJECT;
            if (altPoll) {

                if (status_t status = altPoll(); status != OK) return status;
            buffer += writeSize;
                if (fdTrigger->isTriggered()) {
            if (buffer == end) return OK;
                    return DEAD_OBJECT;
                }
                }
            } else {
                if (status_t status = fdTrigger->triggerablePoll(mSocket.get(), event);
                    status != OK)
                    return status;
                    return status;
                if (!havePolled) havePolled = true;
            }
        }
        }

    status_t interruptableReadFully(FdTrigger* fdTrigger, void* data, size_t size) override {
        uint8_t* buffer = reinterpret_cast<uint8_t*>(data);
        uint8_t* end = buffer + size;

        MAYBE_WAIT_IN_FLAKE_MODE;

        status_t status;
        while ((status = fdTrigger->triggerablePoll(mSocket.get(), POLLIN)) == OK) {
            ssize_t readSize =
                    TEMP_FAILURE_RETRY(::recv(mSocket.get(), buffer, end - buffer, MSG_NOSIGNAL));
            if (readSize < 0) {
                int savedErrno = errno;
                LOG_RPC_DETAIL("RpcTransport recv(): %s", strerror(savedErrno));
                return -savedErrno;
    }
    }


            if (readSize == 0) return DEAD_OBJECT; // EOF
    status_t interruptableWriteFully(FdTrigger* fdTrigger, const void* data, size_t size,

                                     const std::function<status_t()>& altPoll) override {
            buffer += readSize;
        return interruptableReadOrWrite(fdTrigger, reinterpret_cast<const uint8_t*>(data), size,
            if (buffer == end) return OK;
                                        send, "send", POLLOUT, altPoll);
    }
    }
        return status;

    status_t interruptableReadFully(FdTrigger* fdTrigger, void* data, size_t size,
                                    const std::function<status_t()>& altPoll) override {
        return interruptableReadOrWrite(fdTrigger, reinterpret_cast<uint8_t*>(data), size, recv,
                                        "recv", POLLIN, altPoll);
    }
    }


private:
private:
    android::base::unique_fd mSocket;
    base::unique_fd mSocket;
};
};


// RpcTransportCtx with TLS disabled.
// RpcTransportCtx with TLS disabled.
Loading