Donate to e Foundation | Murena handsets with /e/OS | Own a part of Murena! Learn more

Commit 3e94dc96 authored by Yifan Hong's avatar Yifan Hong Committed by Automerger Merge Worker
Browse files

Merge changes I809345c5,Ic6c6bec4 am: 5bff7428 am: bce82c3f

Original change: https://android-review.googlesource.com/c/platform/frameworks/native/+/1747934

Change-Id: Iff9fa6aa6c293d60fa1669817ac5d6e342ae470b
parents b0e16c10 bce82c3f
Loading
Loading
Loading
Loading
+1 −0
Original line number Diff line number Diff line
@@ -120,6 +120,7 @@ cc_library {
        "RpcSession.cpp",
        "RpcServer.cpp",
        "RpcState.cpp",
        "RpcTransportRaw.cpp",
        "Static.cpp",
        "Stability.cpp",
        "Status.cpp",
+37 −12
Original line number Diff line number Diff line
@@ -26,6 +26,7 @@
#include <android-base/scopeguard.h>
#include <binder/Parcel.h>
#include <binder/RpcServer.h>
#include <binder/RpcTransportRaw.h>
#include <log/log.h>

#include "RpcSocketAddress.h"
@@ -37,13 +38,17 @@ namespace android {
using base::ScopeGuard;
using base::unique_fd;

RpcServer::RpcServer() {}
RpcServer::RpcServer(std::unique_ptr<RpcTransportCtxFactory> rpcTransportCtxFactory)
      : mRpcTransportCtxFactory(std::move(rpcTransportCtxFactory)) {}
RpcServer::~RpcServer() {
    (void)shutdown();
}

sp<RpcServer> RpcServer::make() {
    return sp<RpcServer>::make();
sp<RpcServer> RpcServer::make(std::unique_ptr<RpcTransportCtxFactory> rpcTransportCtxFactory) {
    // Default is without TLS.
    if (rpcTransportCtxFactory == nullptr)
        rpcTransportCtxFactory = RpcTransportCtxFactoryRaw::make();
    return sp<RpcServer>::make(std::move(rpcTransportCtxFactory));
}

void RpcServer::iUnderstandThisCodeIsExperimentalAndIWillNotUseItInProduction() {
@@ -153,6 +158,10 @@ void RpcServer::join() {
        mJoinThreadRunning = true;
        mShutdownTrigger = RpcSession::FdTrigger::make();
        LOG_ALWAYS_FATAL_IF(mShutdownTrigger == nullptr, "Cannot create join signaler");

        mCtx = mRpcTransportCtxFactory->newServerCtx();
        LOG_ALWAYS_FATAL_IF(mCtx == nullptr, "Unable to create RpcTransportCtx with %s sockets",
                            mRpcTransportCtxFactory->toCString());
    }

    status_t status;
@@ -219,6 +228,7 @@ bool RpcServer::shutdown() {
    LOG_RPC_DETAIL("Finished waiting on shutdown.");

    mShutdownTrigger = nullptr;
    mCtx = nullptr;
    return true;
}

@@ -244,15 +254,30 @@ void RpcServer::establishConnection(sp<RpcServer>&& server, base::unique_fd clie
    // mShutdownTrigger can only be cleared once connection threads have joined.
    // It must be set before this thread is started
    LOG_ALWAYS_FATAL_IF(server->mShutdownTrigger == nullptr);
    LOG_ALWAYS_FATAL_IF(server->mCtx == nullptr);

    status_t status = OK;

    int clientFdForLog = clientFd.get();
    auto client = server->mCtx->newTransport(std::move(clientFd));
    if (client == nullptr) {
        ALOGE("Dropping accept4()-ed socket because sslAccept fails");
        status = DEAD_OBJECT;
        // still need to cleanup before we can return
    } else {
        LOG_RPC_DETAIL("Created RpcTransport %p for client fd %d", client.get(), clientFdForLog);
    }

    RpcConnectionHeader header;
    status_t status = server->mShutdownTrigger->interruptableReadFully(clientFd.get(), &header,
    if (status == OK) {
        status = server->mShutdownTrigger->interruptableReadFully(client.get(), &header,
                                                                  sizeof(header));
        if (status != OK) {
            ALOGE("Failed to read ID for client connecting to RPC server: %s",
                  statusToString(status).c_str());
            // still need to cleanup before we can return
        }
    }

    bool incoming = false;
    uint32_t protocolVersion = 0;
@@ -271,7 +296,7 @@ void RpcServer::establishConnection(sp<RpcServer>&& server, base::unique_fd clie
                    .version = protocolVersion,
            };

            status = server->mShutdownTrigger->interruptableWriteFully(clientFd.get(), &response,
            status = server->mShutdownTrigger->interruptableWriteFully(client.get(), &response,
                                                                       sizeof(response));
            if (status != OK) {
                ALOGE("Failed to send new session response: %s", statusToString(status).c_str());
@@ -341,7 +366,7 @@ void RpcServer::establishConnection(sp<RpcServer>&& server, base::unique_fd clie
        }

        if (incoming) {
            LOG_ALWAYS_FATAL_IF(!session->addOutgoingConnection(std::move(clientFd), true),
            LOG_ALWAYS_FATAL_IF(!session->addOutgoingConnection(std::move(client), true),
                                "server state must already be initialized");
            return;
        }
@@ -350,7 +375,7 @@ void RpcServer::establishConnection(sp<RpcServer>&& server, base::unique_fd clie
        session->preJoinThreadOwnership(std::move(thisThread));
    }

    auto setupResult = session->preJoinSetup(std::move(clientFd));
    auto setupResult = session->preJoinSetup(std::move(client));

    // avoid strong cycle
    server = nullptr;
+83 −36
Original line number Diff line number Diff line
@@ -30,6 +30,7 @@
#include <android_runtime/vm.h>
#include <binder/Parcel.h>
#include <binder/RpcServer.h>
#include <binder/RpcTransportRaw.h>
#include <binder/Stability.h>
#include <jni.h>
#include <utils/String8.h>
@@ -46,7 +47,8 @@ namespace android {

using base::unique_fd;

RpcSession::RpcSession() {
RpcSession::RpcSession(std::unique_ptr<RpcTransportCtxFactory> rpcTransportCtxFactory)
      : mRpcTransportCtxFactory(std::move(rpcTransportCtxFactory)) {
    LOG_RPC_DETAIL("RpcSession created %p", this);

    mState = std::make_unique<RpcState>();
@@ -59,8 +61,11 @@ RpcSession::~RpcSession() {
                        "Should not be able to destroy a session with servers in use.");
}

sp<RpcSession> RpcSession::make() {
    return sp<RpcSession>::make();
sp<RpcSession> RpcSession::make(std::unique_ptr<RpcTransportCtxFactory> rpcTransportCtxFactory) {
    // Default is without TLS.
    if (rpcTransportCtxFactory == nullptr)
        rpcTransportCtxFactory = RpcTransportCtxFactoryRaw::make();
    return sp<RpcSession>::make(std::move(rpcTransportCtxFactory));
}

void RpcSession::setMaxThreads(size_t threads) {
@@ -122,6 +127,7 @@ bool RpcSession::setupInetClient(const char* addr, unsigned int port) {
}

bool RpcSession::addNullDebuggingClient() {
    // Note: only works on raw sockets.
    unique_fd serverFd(TEMP_FAILURE_RETRY(open("/dev/null", O_WRONLY | O_CLOEXEC)));

    if (serverFd == -1) {
@@ -129,7 +135,17 @@ bool RpcSession::addNullDebuggingClient() {
        return false;
    }

    return addOutgoingConnection(std::move(serverFd), false);
    auto ctx = mRpcTransportCtxFactory->newClientCtx();
    if (ctx == nullptr) {
        ALOGE("Unable to create RpcTransportCtx for null debugging client");
        return false;
    }
    auto server = ctx->newTransport(std::move(serverFd));
    if (server == nullptr) {
        ALOGE("Unable to set up RpcTransport");
        return false;
    }
    return addOutgoingConnection(std::move(server), false);
}

sp<IBinder> RpcSession::getRootObject() {
@@ -205,6 +221,10 @@ bool RpcSession::FdTrigger::isTriggered() {
    return mWrite == -1;
}

status_t RpcSession::FdTrigger::triggerablePoll(RpcTransport* rpcTransport, int16_t event) {
    return triggerablePoll(rpcTransport->pollSocket(), event);
}

status_t RpcSession::FdTrigger::triggerablePoll(base::borrowed_fd fd, int16_t event) {
    while (true) {
        pollfd pfd[]{{.fd = fd.get(), .events = static_cast<int16_t>(event), .revents = 0},
@@ -223,28 +243,30 @@ status_t RpcSession::FdTrigger::triggerablePoll(base::borrowed_fd fd, int16_t ev
    }
}

status_t RpcSession::FdTrigger::interruptableWriteFully(base::borrowed_fd fd, const void* data,
                                                        size_t size) {
status_t RpcSession::FdTrigger::interruptableWriteFully(RpcTransport* rpcTransport,
                                                        const void* data, size_t size) {
    const uint8_t* buffer = reinterpret_cast<const uint8_t*>(data);
    const uint8_t* end = buffer + size;

    MAYBE_WAIT_IN_FLAKE_MODE;

    status_t status;
    while ((status = triggerablePoll(fd, POLLOUT)) == OK) {
        ssize_t writeSize = TEMP_FAILURE_RETRY(send(fd.get(), buffer, end - buffer, MSG_NOSIGNAL));
        if (writeSize == 0) return DEAD_OBJECT;

        if (writeSize < 0) {
            return -errno;
    while ((status = triggerablePoll(rpcTransport, POLLOUT)) == OK) {
        auto writeSize = rpcTransport->send(buffer, end - buffer);
        if (!writeSize.ok()) {
            LOG_RPC_DETAIL("RpcTransport::send(): %s", writeSize.error().message().c_str());
            return writeSize.error().code() == 0 ? UNKNOWN_ERROR : -writeSize.error().code();
        }
        buffer += writeSize;

        if (*writeSize == 0) return DEAD_OBJECT;

        buffer += *writeSize;
        if (buffer == end) return OK;
    }
    return status;
}

status_t RpcSession::FdTrigger::interruptableReadFully(base::borrowed_fd fd, void* data,
status_t RpcSession::FdTrigger::interruptableReadFully(RpcTransport* rpcTransport, void* data,
                                                       size_t size) {
    uint8_t* buffer = reinterpret_cast<uint8_t*>(data);
    uint8_t* end = buffer + size;
@@ -252,14 +274,16 @@ status_t RpcSession::FdTrigger::interruptableReadFully(base::borrowed_fd fd, voi
    MAYBE_WAIT_IN_FLAKE_MODE;

    status_t status;
    while ((status = triggerablePoll(fd, POLLIN)) == OK) {
        ssize_t readSize = TEMP_FAILURE_RETRY(recv(fd.get(), buffer, end - buffer, MSG_NOSIGNAL));
        if (readSize == 0) return DEAD_OBJECT; // EOF

        if (readSize < 0) {
            return -errno;
    while ((status = triggerablePoll(rpcTransport, POLLIN)) == OK) {
        auto readSize = rpcTransport->recv(buffer, end - buffer);
        if (!readSize.ok()) {
            LOG_RPC_DETAIL("RpcTransport::recv(): %s", readSize.error().message().c_str());
            return readSize.error().code() == 0 ? UNKNOWN_ERROR : -readSize.error().code();
        }
        buffer += readSize;

        if (*readSize == 0) return DEAD_OBJECT; // EOF

        buffer += *readSize;
        if (buffer == end) return OK;
    }
    return status;
@@ -312,10 +336,11 @@ void RpcSession::preJoinThreadOwnership(std::thread thread) {
    }
}

RpcSession::PreJoinSetupResult RpcSession::preJoinSetup(base::unique_fd fd) {
RpcSession::PreJoinSetupResult RpcSession::preJoinSetup(
        std::unique_ptr<RpcTransport> rpcTransport) {
    // must be registered to allow arbitrary client code executing commands to
    // be able to do nested calls (we can't only read from it)
    sp<RpcConnection> connection = assignIncomingConnectionToThisThread(std::move(fd));
    sp<RpcConnection> connection = assignIncomingConnectionToThisThread(std::move(rpcTransport));

    status_t status;

@@ -520,6 +545,22 @@ bool RpcSession::setupOneSocketConnection(const RpcSocketAddress& addr, const Rp
                  strerror(savedErrno));
            return false;
        }
        LOG_RPC_DETAIL("Socket at %s client with fd %d", addr.toString().c_str(), serverFd.get());

        auto ctx = mRpcTransportCtxFactory->newClientCtx();
        if (ctx == nullptr) {
            ALOGE("Unable to create client RpcTransportCtx with %s sockets",
                  mRpcTransportCtxFactory->toCString());
            return false;
        }
        auto server = ctx->newTransport(std::move(serverFd));
        if (server == nullptr) {
            ALOGE("Unable to set up RpcTransport for %s", addr.toString().c_str());
            return false;
        }

        LOG_RPC_DETAIL("Socket at %s client with RpcTransport %p", addr.toString().c_str(),
                       server.get());

        RpcConnectionHeader header{
                .version = mProtocolVersion.value_or(RPC_WIRE_PROTOCOL_VERSION),
@@ -529,19 +570,24 @@ bool RpcSession::setupOneSocketConnection(const RpcSocketAddress& addr, const Rp

        if (incoming) header.options |= RPC_CONNECTION_OPTION_INCOMING;

        if (sizeof(header) != TEMP_FAILURE_RETRY(write(serverFd.get(), &header, sizeof(header)))) {
            int savedErrno = errno;
        auto sentHeader = server->send(&header, sizeof(header));
        if (!sentHeader.ok()) {
            ALOGE("Could not write connection header to socket at %s: %s", addr.toString().c_str(),
                  strerror(savedErrno));
                  sentHeader.error().message().c_str());
            return false;
        }
        if (*sentHeader != sizeof(header)) {
            ALOGE("Could not write connection header to socket at %s: sent %zd bytes, expected %zd",
                  addr.toString().c_str(), *sentHeader, sizeof(header));
            return false;
        }

        LOG_RPC_DETAIL("Socket at %s client with fd %d", addr.toString().c_str(), serverFd.get());
        LOG_RPC_DETAIL("Socket at %s client: header sent", addr.toString().c_str());

        if (incoming) {
            return addIncomingConnection(std::move(serverFd));
            return addIncomingConnection(std::move(server));
        } else {
            return addOutgoingConnection(std::move(serverFd), true);
            return addOutgoingConnection(std::move(server), true);
        }
    }

@@ -549,7 +595,7 @@ bool RpcSession::setupOneSocketConnection(const RpcSocketAddress& addr, const Rp
    return false;
}

bool RpcSession::addIncomingConnection(unique_fd fd) {
bool RpcSession::addIncomingConnection(std::unique_ptr<RpcTransport> rpcTransport) {
    std::mutex mutex;
    std::condition_variable joinCv;
    std::unique_lock<std::mutex> lock(mutex);
@@ -558,13 +604,13 @@ bool RpcSession::addIncomingConnection(unique_fd fd) {
    bool ownershipTransferred = false;
    thread = std::thread([&]() {
        std::unique_lock<std::mutex> threadLock(mutex);
        unique_fd movedFd = std::move(fd);
        std::unique_ptr<RpcTransport> movedRpcTransport = std::move(rpcTransport);
        // NOLINTNEXTLINE(performance-unnecessary-copy-initialization)
        sp<RpcSession> session = thiz;
        session->preJoinThreadOwnership(std::move(thread));

        // only continue once we have a response or the connection fails
        auto setupResult = session->preJoinSetup(std::move(movedFd));
        auto setupResult = session->preJoinSetup(std::move(movedRpcTransport));

        ownershipTransferred = true;
        threadLock.unlock();
@@ -578,7 +624,7 @@ bool RpcSession::addIncomingConnection(unique_fd fd) {
    return true;
}

bool RpcSession::addOutgoingConnection(unique_fd fd, bool init) {
bool RpcSession::addOutgoingConnection(std::unique_ptr<RpcTransport> rpcTransport, bool init) {
    sp<RpcConnection> connection = sp<RpcConnection>::make();
    {
        std::lock_guard<std::mutex> _l(mMutex);
@@ -591,7 +637,7 @@ bool RpcSession::addOutgoingConnection(unique_fd fd, bool init) {
            if (mShutdownTrigger == nullptr) return false;
        }

        connection->fd = std::move(fd);
        connection->rpcTransport = std::move(rpcTransport);
        connection->exclusiveTid = gettid();
        mOutgoingConnections.push_back(connection);
    }
@@ -626,7 +672,8 @@ bool RpcSession::setForServer(const wp<RpcServer>& server, const wp<EventListene
    return true;
}

sp<RpcSession::RpcConnection> RpcSession::assignIncomingConnectionToThisThread(unique_fd fd) {
sp<RpcSession::RpcConnection> RpcSession::assignIncomingConnectionToThisThread(
        std::unique_ptr<RpcTransport> rpcTransport) {
    std::lock_guard<std::mutex> _l(mMutex);

    if (mIncomingConnections.size() >= mMaxThreads) {
@@ -644,7 +691,7 @@ sp<RpcSession::RpcConnection> RpcSession::assignIncomingConnectionToThisThread(u
    }

    sp<RpcConnection> session = sp<RpcConnection>::make();
    session->fd = std::move(fd);
    session->rpcTransport = std::move(rpcTransport);
    session->exclusiveTid = gettid();

    mIncomingConnections.push_back(session);
+15 −13
Original line number Diff line number Diff line
@@ -273,7 +273,7 @@ RpcState::CommandData::CommandData(size_t size) : mSize(size) {
status_t RpcState::rpcSend(const sp<RpcSession::RpcConnection>& connection,
                           const sp<RpcSession>& session, const char* what, const void* data,
                           size_t size) {
    LOG_RPC_DETAIL("Sending %s on fd %d: %s", what, connection->fd.get(),
    LOG_RPC_DETAIL("Sending %s on RpcTransport %p: %s", what, connection->rpcTransport.get(),
                   android::base::HexString(data, size).c_str());

    if (size > std::numeric_limits<ssize_t>::max()) {
@@ -282,11 +282,12 @@ status_t RpcState::rpcSend(const sp<RpcSession::RpcConnection>& connection,
        return BAD_VALUE;
    }

    if (status_t status = session->mShutdownTrigger->interruptableWriteFully(connection->fd.get(),
    if (status_t status =
                session->mShutdownTrigger->interruptableWriteFully(connection->rpcTransport.get(),
                                                                   data, size);
        status != OK) {
        LOG_RPC_DETAIL("Failed to write %s (%zu bytes) on fd %d, error: %s", what, size,
                       connection->fd.get(), statusToString(status).c_str());
        LOG_RPC_DETAIL("Failed to write %s (%zu bytes) on RpcTransport %p, error: %s", what, size,
                       connection->rpcTransport.get(), statusToString(status).c_str());
        (void)session->shutdownAndWait(false);
        return status;
    }
@@ -304,14 +305,15 @@ status_t RpcState::rpcRec(const sp<RpcSession::RpcConnection>& connection,
    }

    if (status_t status =
                session->mShutdownTrigger->interruptableReadFully(connection->fd.get(), data, size);
                session->mShutdownTrigger->interruptableReadFully(connection->rpcTransport.get(),
                                                                  data, size);
        status != OK) {
        LOG_RPC_DETAIL("Failed to read %s (%zu bytes) on fd %d, error: %s", what, size,
                       connection->fd.get(), statusToString(status).c_str());
        LOG_RPC_DETAIL("Failed to read %s (%zu bytes) on RpcTransport %p, error: %s", what, size,
                       connection->rpcTransport.get(), statusToString(status).c_str());
        return status;
    }

    LOG_RPC_DETAIL("Received %s on fd %d: %s", what, connection->fd.get(),
    LOG_RPC_DETAIL("Received %s on RpcTransport %p: %s", what, connection->rpcTransport.get(),
                   android::base::HexString(data, size).c_str());
    return OK;
}
@@ -490,7 +492,8 @@ status_t RpcState::transactAddress(const sp<RpcSession::RpcConnection>& connecti
        return status;

    if (flags & IBinder::FLAG_ONEWAY) {
        LOG_RPC_DETAIL("Oneway command, so no longer waiting on %d", connection->fd.get());
        LOG_RPC_DETAIL("Oneway command, so no longer waiting on RpcTransport %p",
                       connection->rpcTransport.get());

        // Do not wait on result.
        // However, too many oneway calls may cause refcounts to build up and fill up the socket,
@@ -585,7 +588,7 @@ status_t RpcState::sendDecStrong(const sp<RpcSession::RpcConnection>& connection

status_t RpcState::getAndExecuteCommand(const sp<RpcSession::RpcConnection>& connection,
                                        const sp<RpcSession>& session, CommandType type) {
    LOG_RPC_DETAIL("getAndExecuteCommand on fd %d", connection->fd.get());
    LOG_RPC_DETAIL("getAndExecuteCommand on RpcTransport %p", connection->rpcTransport.get());

    RpcWireHeader command;
    if (status_t status = rpcRec(connection, session, "command header", &command, sizeof(command));
@@ -598,8 +601,7 @@ status_t RpcState::getAndExecuteCommand(const sp<RpcSession::RpcConnection>& con
status_t RpcState::drainCommands(const sp<RpcSession::RpcConnection>& connection,
                                 const sp<RpcSession>& session, CommandType type) {
    uint8_t buf;
    while (0 < TEMP_FAILURE_RETRY(
                       recv(connection->fd.get(), &buf, sizeof(buf), MSG_PEEK | MSG_DONTWAIT))) {
    while (connection->rpcTransport->peek(&buf, sizeof(buf)).value_or(-1) > 0) {
        status_t status = getAndExecuteCommand(connection, session, type);
        if (status != OK) return status;
    }
+88 −0
Original line number Diff line number Diff line
/*
 * Copyright (C) 2021 The Android Open Source Project
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *      http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

#define LOG_TAG "RpcRawTransport"
#include <log/log.h>

#include <binder/RpcTransportRaw.h>

#include "RpcState.h"

using android::base::ErrnoError;
using android::base::Result;

namespace android {

namespace {

// RpcTransport with TLS disabled.
class RpcTransportRaw : public RpcTransport {
public:
    explicit RpcTransportRaw(android::base::unique_fd socket) : mSocket(std::move(socket)) {}
    Result<ssize_t> send(const void *buf, int size) override {
        ssize_t ret = TEMP_FAILURE_RETRY(::send(mSocket.get(), buf, size, MSG_NOSIGNAL));
        if (ret < 0) {
            return ErrnoError() << "send()";
        }
        return ret;
    }
    Result<ssize_t> recv(void *buf, int size) override {
        ssize_t ret = TEMP_FAILURE_RETRY(::recv(mSocket.get(), buf, size, MSG_NOSIGNAL));
        if (ret < 0) {
            return ErrnoError() << "recv()";
        }
        return ret;
    }
    Result<ssize_t> peek(void *buf, int size) override {
        ssize_t ret = TEMP_FAILURE_RETRY(::recv(mSocket.get(), buf, size, MSG_PEEK | MSG_DONTWAIT));
        if (ret < 0) {
            return ErrnoError() << "recv(MSG_PEEK)";
        }
        return ret;
    }
    bool pending() override { return false; }
    android::base::borrowed_fd pollSocket() const override { return mSocket; }

private:
    android::base::unique_fd mSocket;
};

// RpcTransportCtx with TLS disabled.
class RpcTransportCtxRaw : public RpcTransportCtx {
public:
    std::unique_ptr<RpcTransport> newTransport(android::base::unique_fd fd) const {
        return std::make_unique<RpcTransportRaw>(std::move(fd));
    }
};
} // namespace

std::unique_ptr<RpcTransportCtx> RpcTransportCtxFactoryRaw::newServerCtx() const {
    return std::make_unique<RpcTransportCtxRaw>();
}

std::unique_ptr<RpcTransportCtx> RpcTransportCtxFactoryRaw::newClientCtx() const {
    return std::make_unique<RpcTransportCtxRaw>();
}

const char *RpcTransportCtxFactoryRaw::toCString() const {
    return "raw";
}

std::unique_ptr<RpcTransportCtxFactory> RpcTransportCtxFactoryRaw::make() {
    return std::unique_ptr<RpcTransportCtxFactoryRaw>(new RpcTransportCtxFactoryRaw());
}

} // namespace android
Loading