Donate to e Foundation | Murena handsets with /e/OS | Own a part of Murena! Learn more

Commit 8ea49f16 authored by Pawan Wagh's avatar Pawan Wagh Committed by Automerger Merge Worker
Browse files

Merge "libbinder : Adding new type TransportFd" am: f546f5b0

parents 6917437b f546f5b0
Loading
Loading
Loading
Loading
+14 −4
Original line number Diff line number Diff line
@@ -22,6 +22,7 @@
#include <poll.h>

#include <android-base/macros.h>
#include <android-base/scopeguard.h>

#include "RpcState.h"
namespace android {
@@ -53,25 +54,34 @@ bool FdTrigger::isTriggered() {
#endif
}

status_t FdTrigger::triggerablePoll(base::borrowed_fd fd, int16_t event) {
status_t FdTrigger::triggerablePoll(const android::TransportFd& transportFd, int16_t event) {
#ifdef BINDER_RPC_SINGLE_THREADED
    if (mTriggered) {
        return DEAD_OBJECT;
    }
#endif

    LOG_ALWAYS_FATAL_IF(event == 0, "triggerablePoll %d with event 0 is not allowed", fd.get());
    LOG_ALWAYS_FATAL_IF(event == 0, "triggerablePoll %d with event 0 is not allowed",
                        transportFd.fd.get());
    pollfd pfd[]{
            {.fd = fd.get(), .events = static_cast<int16_t>(event), .revents = 0},
            {.fd = transportFd.fd.get(), .events = static_cast<int16_t>(event), .revents = 0},
#ifndef BINDER_RPC_SINGLE_THREADED
            {.fd = mRead.get(), .events = 0, .revents = 0},
#endif
    };

    LOG_ALWAYS_FATAL_IF(transportFd.isInPollingState() == true,
                        "Only one thread should be polling on Fd!");

    transportFd.setPollingState(true);
    auto pollingStateGuard =
            android::base::make_scope_guard([&]() { transportFd.setPollingState(false); });

    int ret = TEMP_FAILURE_RETRY(poll(pfd, arraysize(pfd), -1));
    if (ret < 0) {
        return -errno;
    }
    LOG_ALWAYS_FATAL_IF(ret == 0, "poll(%d) returns 0 with infinite timeout", fd.get());
    LOG_ALWAYS_FATAL_IF(ret == 0, "poll(%d) returns 0 with infinite timeout", transportFd.fd.get());

    // At least one FD has events. Check them.

+3 −1
Original line number Diff line number Diff line
@@ -21,6 +21,8 @@
#include <android-base/unique_fd.h>
#include <utils/Errors.h>

#include <binder/RpcTransport.h>

namespace android {

/** This is not a pipe. */
@@ -53,7 +55,7 @@ public:
     *   true - time to read!
     *   false - trigger happened
     */
    [[nodiscard]] status_t triggerablePoll(base::borrowed_fd fd, int16_t event);
    [[nodiscard]] status_t triggerablePoll(const android::TransportFd& transportFd, int16_t event);

private:
#ifdef BINDER_RPC_SINGLE_THREADED
+19 −19
Original line number Diff line number Diff line
@@ -86,7 +86,7 @@ status_t RpcServer::setupInetServer(const char* address, unsigned int port,
        LOG_ALWAYS_FATAL_IF(socketAddress.addr()->sa_family != AF_INET, "expecting inet");
        sockaddr_in addr{};
        socklen_t len = sizeof(addr);
        if (0 != getsockname(mServer.get(), reinterpret_cast<sockaddr*>(&addr), &len)) {
        if (0 != getsockname(mServer.fd.get(), reinterpret_cast<sockaddr*>(&addr), &len)) {
            int savedErrno = errno;
            ALOGE("Could not getsockname at %s: %s", socketAddress.toString().c_str(),
                  strerror(savedErrno));
@@ -181,7 +181,7 @@ void RpcServer::join() {

    {
        RpcMutexLockGuard _l(mLock);
        LOG_ALWAYS_FATAL_IF(!mServer.ok(), "RpcServer must be setup to join.");
        LOG_ALWAYS_FATAL_IF(!mServer.fd.ok(), "RpcServer must be setup to join.");
        LOG_ALWAYS_FATAL_IF(mShutdownTrigger != nullptr, "Already joined");
        mJoinThreadRunning = true;
        mShutdownTrigger = FdTrigger::make();
@@ -194,24 +194,24 @@ void RpcServer::join() {
        static_assert(addr.size() >= sizeof(sockaddr_storage), "kRpcAddressSize is too small");

        socklen_t addrLen = addr.size();
        unique_fd clientFd(
                TEMP_FAILURE_RETRY(accept4(mServer.get(), reinterpret_cast<sockaddr*>(addr.data()),
                                           &addrLen, SOCK_CLOEXEC | SOCK_NONBLOCK)));
        TransportFd clientSocket(unique_fd(TEMP_FAILURE_RETRY(
                accept4(mServer.fd.get(), reinterpret_cast<sockaddr*>(addr.data()), &addrLen,
                        SOCK_CLOEXEC | SOCK_NONBLOCK))));

        LOG_ALWAYS_FATAL_IF(addrLen > static_cast<socklen_t>(sizeof(sockaddr_storage)),
                            "Truncated address");

        if (clientFd < 0) {
        if (clientSocket.fd < 0) {
            ALOGE("Could not accept4 socket: %s", strerror(errno));
            continue;
        }
        LOG_RPC_DETAIL("accept4 on fd %d yields fd %d", mServer.get(), clientFd.get());
        LOG_RPC_DETAIL("accept4 on fd %d yields fd %d", mServer.fd.get(), clientSocket.fd.get());

        {
            RpcMutexLockGuard _l(mLock);
            RpcMaybeThread thread =
                    RpcMaybeThread(&RpcServer::establishConnection,
                                   sp<RpcServer>::fromExisting(this), std::move(clientFd), addr,
                                   sp<RpcServer>::fromExisting(this), std::move(clientSocket), addr,
                                   addrLen, RpcSession::join);

            auto& threadRef = mConnectingThreads[thread.get_id()];
@@ -296,7 +296,7 @@ size_t RpcServer::numUninitializedSessions() {
}

void RpcServer::establishConnection(
        sp<RpcServer>&& server, base::unique_fd clientFd, std::array<uint8_t, kRpcAddressSize> addr,
        sp<RpcServer>&& server, TransportFd clientFd, std::array<uint8_t, kRpcAddressSize> addr,
        size_t addrLen,
        std::function<void(sp<RpcSession>&&, RpcSession::PreJoinSetupResult&&)>&& joinFn) {
    // mShutdownTrigger can only be cleared once connection threads have joined.
@@ -306,7 +306,7 @@ void RpcServer::establishConnection(

    status_t status = OK;

    int clientFdForLog = clientFd.get();
    int clientFdForLog = clientFd.fd.get();
    auto client = server->mCtx->newTransport(std::move(clientFd), server->mShutdownTrigger.get());
    if (client == nullptr) {
        ALOGE("Dropping accept4()-ed socket because sslAccept fails");
@@ -488,15 +488,15 @@ status_t RpcServer::setupSocketServer(const RpcSocketAddress& addr) {
    LOG_RPC_DETAIL("Setting up socket server %s", addr.toString().c_str());
    LOG_ALWAYS_FATAL_IF(hasServer(), "Each RpcServer can only have one server.");

    unique_fd serverFd(TEMP_FAILURE_RETRY(
            socket(addr.addr()->sa_family, SOCK_STREAM | SOCK_CLOEXEC | SOCK_NONBLOCK, 0)));
    if (serverFd == -1) {
    TransportFd transportFd(unique_fd(TEMP_FAILURE_RETRY(
            socket(addr.addr()->sa_family, SOCK_STREAM | SOCK_CLOEXEC | SOCK_NONBLOCK, 0))));
    if (!transportFd.fd.ok()) {
        int savedErrno = errno;
        ALOGE("Could not create socket: %s", strerror(savedErrno));
        return -savedErrno;
    }

    if (0 != TEMP_FAILURE_RETRY(bind(serverFd.get(), addr.addr(), addr.addrSize()))) {
    if (0 != TEMP_FAILURE_RETRY(bind(transportFd.fd.get(), addr.addr(), addr.addrSize()))) {
        int savedErrno = errno;
        ALOGE("Could not bind socket at %s: %s", addr.toString().c_str(), strerror(savedErrno));
        return -savedErrno;
@@ -506,7 +506,7 @@ status_t RpcServer::setupSocketServer(const RpcSocketAddress& addr) {
    // the backlog is increased to a large number.
    // TODO(b/189955605): Once we create threads dynamically & lazily, the backlog can be reduced
    //  to 1.
    if (0 != TEMP_FAILURE_RETRY(listen(serverFd.get(), 50 /*backlog*/))) {
    if (0 != TEMP_FAILURE_RETRY(listen(transportFd.fd.get(), 50 /*backlog*/))) {
        int savedErrno = errno;
        ALOGE("Could not listen socket at %s: %s", addr.toString().c_str(), strerror(savedErrno));
        return -savedErrno;
@@ -514,7 +514,7 @@ status_t RpcServer::setupSocketServer(const RpcSocketAddress& addr) {

    LOG_RPC_DETAIL("Successfully setup socket server %s", addr.toString().c_str());

    if (status_t status = setupExternalServer(std::move(serverFd)); status != OK) {
    if (status_t status = setupExternalServer(std::move(transportFd.fd)); status != OK) {
        ALOGE("Another thread has set up server while calling setupSocketServer. Race?");
        return status;
    }
@@ -542,17 +542,17 @@ void RpcServer::onSessionIncomingThreadEnded() {

bool RpcServer::hasServer() {
    RpcMutexLockGuard _l(mLock);
    return mServer.ok();
    return mServer.fd.ok();
}

unique_fd RpcServer::releaseServer() {
    RpcMutexLockGuard _l(mLock);
    return std::move(mServer);
    return std::move(mServer.fd);
}

status_t RpcServer::setupExternalServer(base::unique_fd serverFd) {
    RpcMutexLockGuard _l(mLock);
    if (mServer.ok()) {
    if (mServer.fd.ok()) {
        ALOGE("Each RpcServer can only have one server.");
        return INVALID_OPERATION;
    }
+17 −10
Original line number Diff line number Diff line
@@ -162,7 +162,8 @@ status_t RpcSession::setupInetClient(const char* addr, unsigned int port) {
    return NAME_NOT_FOUND;
}

status_t RpcSession::setupPreconnectedClient(unique_fd fd, std::function<unique_fd()>&& request) {
status_t RpcSession::setupPreconnectedClient(base::unique_fd fd,
                                             std::function<unique_fd()>&& request) {
    return setupClient([&](const std::vector<uint8_t>& sessionId, bool incoming) -> status_t {
        if (!fd.ok()) {
            fd = request();
@@ -172,7 +173,9 @@ status_t RpcSession::setupPreconnectedClient(unique_fd fd, std::function<unique_
            ALOGE("setupPreconnectedClient: %s", res.error().message().c_str());
            return res.error().code() == 0 ? UNKNOWN_ERROR : -res.error().code();
        }
        status_t status = initAndAddConnection(std::move(fd), sessionId, incoming);

        TransportFd transportFd(std::move(fd));
        status_t status = initAndAddConnection(std::move(transportFd), sessionId, incoming);
        fd = unique_fd(); // Explicitly reset after move to avoid analyzer warning.
        return status;
    });
@@ -190,7 +193,8 @@ status_t RpcSession::addNullDebuggingClient() {
        return -savedErrno;
    }

    auto server = mCtx->newTransport(std::move(serverFd), mShutdownTrigger.get());
    TransportFd transportFd(std::move(serverFd));
    auto server = mCtx->newTransport(std::move(transportFd), mShutdownTrigger.get());
    if (server == nullptr) {
        ALOGE("Unable to set up RpcTransport");
        return UNKNOWN_ERROR;
@@ -572,12 +576,14 @@ status_t RpcSession::setupOneSocketConnection(const RpcSocketAddress& addr,
            return -savedErrno;
        }

        if (0 != TEMP_FAILURE_RETRY(connect(serverFd.get(), addr.addr(), addr.addrSize()))) {
        TransportFd transportFd(std::move(serverFd));

        if (0 != TEMP_FAILURE_RETRY(connect(transportFd.fd.get(), addr.addr(), addr.addrSize()))) {
            int connErrno = errno;
            if (connErrno == EAGAIN || connErrno == EINPROGRESS) {
                // For non-blocking sockets, connect() may return EAGAIN (for unix domain socket) or
                // EINPROGRESS (for others). Call poll() and getsockopt() to get the error.
                status_t pollStatus = mShutdownTrigger->triggerablePoll(serverFd, POLLOUT);
                status_t pollStatus = mShutdownTrigger->triggerablePoll(transportFd, POLLOUT);
                if (pollStatus != OK) {
                    ALOGE("Could not POLLOUT after connect() on non-blocking socket: %s",
                          statusToString(pollStatus).c_str());
@@ -585,8 +591,8 @@ status_t RpcSession::setupOneSocketConnection(const RpcSocketAddress& addr,
                }
                // Set connErrno to the errno that connect() would have set if the fd were blocking.
                socklen_t connErrnoLen = sizeof(connErrno);
                int ret =
                        getsockopt(serverFd.get(), SOL_SOCKET, SO_ERROR, &connErrno, &connErrnoLen);
                int ret = getsockopt(transportFd.fd.get(), SOL_SOCKET, SO_ERROR, &connErrno,
                                     &connErrnoLen);
                if (ret == -1) {
                    int savedErrno = errno;
                    ALOGE("Could not getsockopt() after connect() on non-blocking socket: %s. "
@@ -608,16 +614,17 @@ status_t RpcSession::setupOneSocketConnection(const RpcSocketAddress& addr,
                return -connErrno;
            }
        }
        LOG_RPC_DETAIL("Socket at %s client with fd %d", addr.toString().c_str(), serverFd.get());
        LOG_RPC_DETAIL("Socket at %s client with fd %d", addr.toString().c_str(),
                       transportFd.fd.get());

        return initAndAddConnection(std::move(serverFd), sessionId, incoming);
        return initAndAddConnection(std::move(transportFd), sessionId, incoming);
    }

    ALOGE("Ran out of retries to connect to %s", addr.toString().c_str());
    return UNKNOWN_ERROR;
}

status_t RpcSession::initAndAddConnection(unique_fd fd, const std::vector<uint8_t>& sessionId,
status_t RpcSession::initAndAddConnection(TransportFd fd, const std::vector<uint8_t>& sessionId,
                                          bool incoming) {
    LOG_ALWAYS_FATAL_IF(mShutdownTrigger == nullptr);
    auto server = mCtx->newTransport(std::move(fd), mShutdownTrigger.get());
+15 −13
Original line number Diff line number Diff line
@@ -36,11 +36,11 @@ constexpr size_t kMaxFdsPerMsg = 253;
// RpcTransport with TLS disabled.
class RpcTransportRaw : public RpcTransport {
public:
    explicit RpcTransportRaw(android::base::unique_fd socket) : mSocket(std::move(socket)) {}
    explicit RpcTransportRaw(android::TransportFd socket) : mSocket(std::move(socket)) {}
    status_t pollRead(void) override {
        uint8_t buf;
        ssize_t ret = TEMP_FAILURE_RETRY(
                ::recv(mSocket.get(), &buf, sizeof(buf), MSG_PEEK | MSG_DONTWAIT));
                ::recv(mSocket.fd.get(), &buf, sizeof(buf), MSG_PEEK | MSG_DONTWAIT));
        if (ret < 0) {
            int savedErrno = errno;
            if (savedErrno == EAGAIN || savedErrno == EWOULDBLOCK) {
@@ -100,7 +100,7 @@ public:
                msg.msg_controllen = CMSG_SPACE(fdsByteSize);

                ssize_t processedSize = TEMP_FAILURE_RETRY(
                        sendmsg(mSocket.get(), &msg, MSG_NOSIGNAL | MSG_CMSG_CLOEXEC));
                        sendmsg(mSocket.fd.get(), &msg, MSG_NOSIGNAL | MSG_CMSG_CLOEXEC));
                if (processedSize > 0) {
                    sentFds = true;
                }
@@ -113,10 +113,10 @@ public:
                    // non-negative int and can be cast to either.
                    .msg_iovlen = static_cast<decltype(msg.msg_iovlen)>(niovs),
            };
            return TEMP_FAILURE_RETRY(sendmsg(mSocket.get(), &msg, MSG_NOSIGNAL));
            return TEMP_FAILURE_RETRY(sendmsg(mSocket.fd.get(), &msg, MSG_NOSIGNAL));
        };
        return interruptableReadOrWrite(mSocket.get(), fdTrigger, iovs, niovs, send, "sendmsg",
                                        POLLOUT, altPoll);
        return interruptableReadOrWrite(mSocket, fdTrigger, iovs, niovs, send, "sendmsg", POLLOUT,
                                        altPoll);
    }

    status_t interruptableReadFully(
@@ -135,7 +135,7 @@ public:
                        .msg_controllen = sizeof(msgControlBuf),
                };
                ssize_t processSize =
                        TEMP_FAILURE_RETRY(recvmsg(mSocket.get(), &msg, MSG_NOSIGNAL));
                        TEMP_FAILURE_RETRY(recvmsg(mSocket.fd.get(), &msg, MSG_NOSIGNAL));
                if (processSize < 0) {
                    return -1;
                }
@@ -171,21 +171,23 @@ public:
                    // non-negative int and can be cast to either.
                    .msg_iovlen = static_cast<decltype(msg.msg_iovlen)>(niovs),
            };
            return TEMP_FAILURE_RETRY(recvmsg(mSocket.get(), &msg, MSG_NOSIGNAL));
            return TEMP_FAILURE_RETRY(recvmsg(mSocket.fd.get(), &msg, MSG_NOSIGNAL));
        };
        return interruptableReadOrWrite(mSocket.get(), fdTrigger, iovs, niovs, recv, "recvmsg",
                                        POLLIN, altPoll);
        return interruptableReadOrWrite(mSocket, fdTrigger, iovs, niovs, recv, "recvmsg", POLLIN,
                                        altPoll);
    }

    virtual bool isWaiting() { return mSocket.isInPollingState(); }

private:
    base::unique_fd mSocket;
    android::TransportFd mSocket;
};

// RpcTransportCtx with TLS disabled.
class RpcTransportCtxRaw : public RpcTransportCtx {
public:
    std::unique_ptr<RpcTransport> newTransport(android::base::unique_fd fd, FdTrigger*) const {
        return std::make_unique<RpcTransportRaw>(std::move(fd));
    std::unique_ptr<RpcTransport> newTransport(android::TransportFd socket, FdTrigger*) const {
        return std::make_unique<RpcTransportRaw>(std::move(socket));
    }
    std::vector<uint8_t> getCertificate(RpcCertificateFormat) const override { return {}; }
};
Loading