Donate to e Foundation | Murena handsets with /e/OS | Own a part of Murena! Learn more

Commit a0fa1b88 authored by Steven Moreland's avatar Steven Moreland Committed by Automerger Merge Worker
Browse files

Merge changes Ie1fc2d92,Ie66e92cc,I09a4520a,Ia13d0dc1 am: 1bbd2eef am:...

Merge changes Ie1fc2d92,Ie66e92cc,I09a4520a,Ia13d0dc1 am: 1bbd2eef am: 0c008e79 am: 7bae0b32 am: 1e08a170

Original change: https://android-review.googlesource.com/c/platform/frameworks/native/+/1769154

Change-Id: I13e0b8f28f8aac3f67583afffd5ecf925bc8999b
parents be8ce7f1 1e08a170
Loading
Loading
Loading
Loading
+6 −5
Original line number Original line Diff line number Diff line
@@ -16,6 +16,7 @@


#define LOG_TAG "RpcServer"
#define LOG_TAG "RpcServer"


#include <poll.h>
#include <sys/socket.h>
#include <sys/socket.h>
#include <sys/un.h>
#include <sys/un.h>


@@ -152,7 +153,7 @@ void RpcServer::join() {
    }
    }


    status_t status;
    status_t status;
    while ((status = mShutdownTrigger->triggerablePollRead(mServer)) == OK) {
    while ((status = mShutdownTrigger->triggerablePoll(mServer, POLLIN)) == OK) {
        unique_fd clientFd(TEMP_FAILURE_RETRY(
        unique_fd clientFd(TEMP_FAILURE_RETRY(
                accept4(mServer.get(), nullptr, nullptr /*length*/, SOCK_CLOEXEC)));
                accept4(mServer.get(), nullptr, nullptr /*length*/, SOCK_CLOEXEC)));


@@ -250,7 +251,7 @@ void RpcServer::establishConnection(sp<RpcServer>&& server, base::unique_fd clie
              statusToString(status).c_str());
              statusToString(status).c_str());
        // still need to cleanup before we can return
        // still need to cleanup before we can return
    }
    }
    bool reverse = header.options & RPC_CONNECTION_OPTION_REVERSE;
    bool incoming = header.options & RPC_CONNECTION_OPTION_INCOMING;


    std::thread thisThread;
    std::thread thisThread;
    sp<RpcSession> session;
    sp<RpcSession> session;
@@ -275,8 +276,8 @@ void RpcServer::establishConnection(sp<RpcServer>&& server, base::unique_fd clie
        RpcAddress sessionId = RpcAddress::fromRawEmbedded(&header.sessionId);
        RpcAddress sessionId = RpcAddress::fromRawEmbedded(&header.sessionId);


        if (sessionId.isZero()) {
        if (sessionId.isZero()) {
            if (reverse) {
            if (incoming) {
                ALOGE("Cannot create a new session with a reverse connection, would leak");
                ALOGE("Cannot create a new session with an incoming connection, would leak");
                return;
                return;
            }
            }


@@ -314,7 +315,7 @@ void RpcServer::establishConnection(sp<RpcServer>&& server, base::unique_fd clie
            session = it->second;
            session = it->second;
        }
        }


        if (reverse) {
        if (incoming) {
            LOG_ALWAYS_FATAL_IF(!session->addOutgoingConnection(std::move(clientFd), true),
            LOG_ALWAYS_FATAL_IF(!session->addOutgoingConnection(std::move(clientFd), true),
                                "server state must already be initialized");
                                "server state must already be initialized");
            return;
            return;
+63 −36
Original line number Original line Diff line number Diff line
@@ -175,9 +175,11 @@ bool RpcSession::FdTrigger::isTriggered() {
    return mWrite == -1;
    return mWrite == -1;
}
}


status_t RpcSession::FdTrigger::triggerablePollRead(base::borrowed_fd fd) {
status_t RpcSession::FdTrigger::triggerablePoll(base::borrowed_fd fd, int16_t event) {
    while (true) {
    while (true) {
        pollfd pfd[]{{.fd = fd.get(), .events = POLLIN | POLLHUP, .revents = 0},
        pollfd pfd[]{{.fd = fd.get(),
                      .events = static_cast<int16_t>(event | POLLHUP),
                      .revents = 0},
                     {.fd = mRead.get(), .events = POLLHUP, .revents = 0}};
                     {.fd = mRead.get(), .events = POLLHUP, .revents = 0}};
        int ret = TEMP_FAILURE_RETRY(poll(pfd, arraysize(pfd), -1));
        int ret = TEMP_FAILURE_RETRY(poll(pfd, arraysize(pfd), -1));
        if (ret < 0) {
        if (ret < 0) {
@@ -189,10 +191,31 @@ status_t RpcSession::FdTrigger::triggerablePollRead(base::borrowed_fd fd) {
        if (pfd[1].revents & POLLHUP) {
        if (pfd[1].revents & POLLHUP) {
            return -ECANCELED;
            return -ECANCELED;
        }
        }
        return pfd[0].revents & POLLIN ? OK : DEAD_OBJECT;
        return pfd[0].revents & event ? OK : DEAD_OBJECT;
    }
    }
}
}


status_t RpcSession::FdTrigger::interruptableWriteFully(base::borrowed_fd fd, const void* data,
                                                        size_t size) {
    const uint8_t* buffer = reinterpret_cast<const uint8_t*>(data);
    const uint8_t* end = buffer + size;

    MAYBE_WAIT_IN_FLAKE_MODE;

    status_t status;
    while ((status = triggerablePoll(fd, POLLOUT)) == OK) {
        ssize_t writeSize = TEMP_FAILURE_RETRY(send(fd.get(), buffer, end - buffer, MSG_NOSIGNAL));
        if (writeSize == 0) return DEAD_OBJECT;

        if (writeSize < 0) {
            return -errno;
        }
        buffer += writeSize;
        if (buffer == end) return OK;
    }
    return status;
}

status_t RpcSession::FdTrigger::interruptableReadFully(base::borrowed_fd fd, void* data,
status_t RpcSession::FdTrigger::interruptableReadFully(base::borrowed_fd fd, void* data,
                                                       size_t size) {
                                                       size_t size) {
    uint8_t* buffer = reinterpret_cast<uint8_t*>(data);
    uint8_t* buffer = reinterpret_cast<uint8_t*>(data);
@@ -201,7 +224,7 @@ status_t RpcSession::FdTrigger::interruptableReadFully(base::borrowed_fd fd, voi
    MAYBE_WAIT_IN_FLAKE_MODE;
    MAYBE_WAIT_IN_FLAKE_MODE;


    status_t status;
    status_t status;
    while ((status = triggerablePollRead(fd)) == OK) {
    while ((status = triggerablePoll(fd, POLLIN)) == OK) {
        ssize_t readSize = TEMP_FAILURE_RETRY(recv(fd.get(), buffer, end - buffer, MSG_NOSIGNAL));
        ssize_t readSize = TEMP_FAILURE_RETRY(recv(fd.get(), buffer, end - buffer, MSG_NOSIGNAL));
        if (readSize == 0) return DEAD_OBJECT; // EOF
        if (readSize == 0) return DEAD_OBJECT; // EOF


@@ -330,7 +353,7 @@ bool RpcSession::setupSocketClient(const RpcSocketAddress& addr) {
                            mOutgoingConnections.size());
                            mOutgoingConnections.size());
    }
    }


    if (!setupOneSocketConnection(addr, RpcAddress::zero(), false /*reverse*/)) return false;
    if (!setupOneSocketConnection(addr, RpcAddress::zero(), false /*incoming*/)) return false;


    // TODO(b/189955605): we should add additional sessions dynamically
    // TODO(b/189955605): we should add additional sessions dynamically
    // instead of all at once.
    // instead of all at once.
@@ -351,7 +374,7 @@ bool RpcSession::setupSocketClient(const RpcSocketAddress& addr) {
    // we've already setup one client
    // we've already setup one client
    for (size_t i = 0; i + 1 < numThreadsAvailable; i++) {
    for (size_t i = 0; i + 1 < numThreadsAvailable; i++) {
        // TODO(b/189955605): shutdown existing connections?
        // TODO(b/189955605): shutdown existing connections?
        if (!setupOneSocketConnection(addr, mId.value(), false /*reverse*/)) return false;
        if (!setupOneSocketConnection(addr, mId.value(), false /*incoming*/)) return false;
    }
    }


    // TODO(b/189955605): we should add additional sessions dynamically
    // TODO(b/189955605): we should add additional sessions dynamically
@@ -361,14 +384,14 @@ bool RpcSession::setupSocketClient(const RpcSocketAddress& addr) {
    // any requests at all.
    // any requests at all.


    for (size_t i = 0; i < mMaxThreads; i++) {
    for (size_t i = 0; i < mMaxThreads; i++) {
        if (!setupOneSocketConnection(addr, mId.value(), true /*reverse*/)) return false;
        if (!setupOneSocketConnection(addr, mId.value(), true /*incoming*/)) return false;
    }
    }


    return true;
    return true;
}
}


bool RpcSession::setupOneSocketConnection(const RpcSocketAddress& addr, const RpcAddress& id,
bool RpcSession::setupOneSocketConnection(const RpcSocketAddress& addr, const RpcAddress& id,
                                          bool reverse) {
                                          bool incoming) {
    for (size_t tries = 0; tries < 5; tries++) {
    for (size_t tries = 0; tries < 5; tries++) {
        if (tries > 0) usleep(10000);
        if (tries > 0) usleep(10000);


@@ -395,7 +418,7 @@ bool RpcSession::setupOneSocketConnection(const RpcSocketAddress& addr, const Rp
        RpcConnectionHeader header{.options = 0};
        RpcConnectionHeader header{.options = 0};
        memcpy(&header.sessionId, &id.viewRawEmbedded(), sizeof(RpcWireAddress));
        memcpy(&header.sessionId, &id.viewRawEmbedded(), sizeof(RpcWireAddress));


        if (reverse) header.options |= RPC_CONNECTION_OPTION_REVERSE;
        if (incoming) header.options |= RPC_CONNECTION_OPTION_INCOMING;


        if (sizeof(header) != TEMP_FAILURE_RETRY(write(serverFd.get(), &header, sizeof(header)))) {
        if (sizeof(header) != TEMP_FAILURE_RETRY(write(serverFd.get(), &header, sizeof(header)))) {
            int savedErrno = errno;
            int savedErrno = errno;
@@ -406,7 +429,18 @@ bool RpcSession::setupOneSocketConnection(const RpcSocketAddress& addr, const Rp


        LOG_RPC_DETAIL("Socket at %s client with fd %d", addr.toString().c_str(), serverFd.get());
        LOG_RPC_DETAIL("Socket at %s client with fd %d", addr.toString().c_str(), serverFd.get());


        if (reverse) {
        if (incoming) {
            return addIncomingConnection(std::move(serverFd));
        } else {
            return addOutgoingConnection(std::move(serverFd), true);
        }
    }

    ALOGE("Ran out of retries to connect to %s", addr.toString().c_str());
    return false;
}

bool RpcSession::addIncomingConnection(unique_fd fd) {
    std::mutex mutex;
    std::mutex mutex;
    std::condition_variable joinCv;
    std::condition_variable joinCv;
    std::unique_lock<std::mutex> lock(mutex);
    std::unique_lock<std::mutex> lock(mutex);
@@ -415,13 +449,13 @@ bool RpcSession::setupOneSocketConnection(const RpcSocketAddress& addr, const Rp
    bool ownershipTransferred = false;
    bool ownershipTransferred = false;
    thread = std::thread([&]() {
    thread = std::thread([&]() {
        std::unique_lock<std::mutex> threadLock(mutex);
        std::unique_lock<std::mutex> threadLock(mutex);
                unique_fd fd = std::move(serverFd);
        unique_fd movedFd = std::move(fd);
        // NOLINTNEXTLINE(performance-unnecessary-copy-initialization)
        // NOLINTNEXTLINE(performance-unnecessary-copy-initialization)
        sp<RpcSession> session = thiz;
        sp<RpcSession> session = thiz;
        session->preJoinThreadOwnership(std::move(thread));
        session->preJoinThreadOwnership(std::move(thread));


        // only continue once we have a response or the connection fails
        // only continue once we have a response or the connection fails
                auto setupResult = session->preJoinSetup(std::move(fd));
        auto setupResult = session->preJoinSetup(std::move(movedFd));


        ownershipTransferred = true;
        ownershipTransferred = true;
        threadLock.unlock();
        threadLock.unlock();
@@ -433,13 +467,6 @@ bool RpcSession::setupOneSocketConnection(const RpcSocketAddress& addr, const Rp
    joinCv.wait(lock, [&] { return ownershipTransferred; });
    joinCv.wait(lock, [&] { return ownershipTransferred; });
    LOG_ALWAYS_FATAL_IF(!ownershipTransferred);
    LOG_ALWAYS_FATAL_IF(!ownershipTransferred);
    return true;
    return true;
        } else {
            return addOutgoingConnection(std::move(serverFd), true);
        }
    }

    ALOGE("Ran out of retries to connect to %s", addr.toString().c_str());
    return false;
}
}


bool RpcSession::addOutgoingConnection(unique_fd fd, bool init) {
bool RpcSession::addOutgoingConnection(unique_fd fd, bool init) {
+6 −10
Original line number Original line Diff line number Diff line
@@ -275,23 +275,19 @@ status_t RpcState::rpcSend(const sp<RpcSession::RpcConnection>& connection,
    LOG_RPC_DETAIL("Sending %s on fd %d: %s", what, connection->fd.get(),
    LOG_RPC_DETAIL("Sending %s on fd %d: %s", what, connection->fd.get(),
                   hexString(data, size).c_str());
                   hexString(data, size).c_str());


    MAYBE_WAIT_IN_FLAKE_MODE;

    if (size > std::numeric_limits<ssize_t>::max()) {
    if (size > std::numeric_limits<ssize_t>::max()) {
        ALOGE("Cannot send %s at size %zu (too big)", what, size);
        ALOGE("Cannot send %s at size %zu (too big)", what, size);
        (void)session->shutdownAndWait(false);
        (void)session->shutdownAndWait(false);
        return BAD_VALUE;
        return BAD_VALUE;
    }
    }


    ssize_t sent = TEMP_FAILURE_RETRY(send(connection->fd.get(), data, size, MSG_NOSIGNAL));
    if (status_t status = session->mShutdownTrigger->interruptableWriteFully(connection->fd.get(),

                                                                             data, size);
    if (sent < 0 || sent != static_cast<ssize_t>(size)) {
        status != OK) {
        int savedErrno = errno;
        LOG_RPC_DETAIL("Failed to write %s (%zu bytes) on fd %d, error: %s", what, size,
        LOG_RPC_DETAIL("Failed to send %s (sent %zd of %zu bytes) on fd %d, error: %s", what, sent,
                       connection->fd.get(), statusToString(status).c_str());
                       size, connection->fd.get(), strerror(savedErrno));

        (void)session->shutdownAndWait(false);
        (void)session->shutdownAndWait(false);
        return -savedErrno;
        return status;
    }
    }


    return OK;
    return OK;
+2 −2
Original line number Original line Diff line number Diff line
@@ -21,7 +21,7 @@ namespace android {
#pragma clang diagnostic error "-Wpadded"
#pragma clang diagnostic error "-Wpadded"


enum : uint8_t {
enum : uint8_t {
    RPC_CONNECTION_OPTION_REVERSE = 0x1,
    RPC_CONNECTION_OPTION_INCOMING = 0x1, // default is outgoing
};
};


constexpr uint64_t RPC_WIRE_ADDRESS_OPTION_CREATED = 1 << 0; // distinguish from '0' address
constexpr uint64_t RPC_WIRE_ADDRESS_OPTION_CREATED = 1 << 0; // distinguish from '0' address
@@ -47,7 +47,7 @@ struct RpcConnectionHeader {
/**
/**
 * Whenever a client connection is setup, this is sent as the initial
 * Whenever a client connection is setup, this is sent as the initial
 * transaction. The main use of this is in order to control the timing for when
 * transaction. The main use of this is in order to control the timing for when
 * a reverse connection is setup.
 * an incoming connection is setup.
 */
 */
struct RpcOutgoingConnectionInit {
struct RpcOutgoingConnectionInit {
    char msg[4];
    char msg[4];
+7 −3
Original line number Original line Diff line number Diff line
@@ -152,20 +152,23 @@ private:
        /**
        /**
         * Poll for a read event.
         * Poll for a read event.
         *
         *
         * event - for pollfd
         *
         * Return:
         * Return:
         *   true - time to read!
         *   true - time to read!
         *   false - trigger happened
         *   false - trigger happened
         */
         */
        status_t triggerablePollRead(base::borrowed_fd fd);
        status_t triggerablePoll(base::borrowed_fd fd, int16_t event);


        /**
        /**
         * Read, but allow the read to be interrupted by this trigger.
         * Read (or write), but allow to be interrupted by this trigger.
         *
         *
         * Return:
         * Return:
         *   true - read succeeded at 'size'
         *   true - succeeded in completely processing 'size'
         *   false - interrupted (failure or trigger)
         *   false - interrupted (failure or trigger)
         */
         */
        status_t interruptableReadFully(base::borrowed_fd fd, void* data, size_t size);
        status_t interruptableReadFully(base::borrowed_fd fd, void* data, size_t size);
        status_t interruptableWriteFully(base::borrowed_fd fd, const void* data, size_t size);


    private:
    private:
        base::unique_fd mWrite;
        base::unique_fd mWrite;
@@ -223,6 +226,7 @@ private:
    [[nodiscard]] bool setupSocketClient(const RpcSocketAddress& address);
    [[nodiscard]] bool setupSocketClient(const RpcSocketAddress& address);
    [[nodiscard]] bool setupOneSocketConnection(const RpcSocketAddress& address,
    [[nodiscard]] bool setupOneSocketConnection(const RpcSocketAddress& address,
                                                const RpcAddress& sessionId, bool server);
                                                const RpcAddress& sessionId, bool server);
    [[nodiscard]] bool addIncomingConnection(base::unique_fd fd);
    [[nodiscard]] bool addOutgoingConnection(base::unique_fd fd, bool init);
    [[nodiscard]] bool addOutgoingConnection(base::unique_fd fd, bool init);
    [[nodiscard]] bool setForServer(const wp<RpcServer>& server,
    [[nodiscard]] bool setForServer(const wp<RpcServer>& server,
                                    const wp<RpcSession::EventListener>& eventListener,
                                    const wp<RpcSession::EventListener>& eventListener,
Loading