Donate to e Foundation | Murena handsets with /e/OS | Own a part of Murena! Learn more

Commit 1b304299 authored by Steven Moreland's avatar Steven Moreland
Browse files

libbinder: respect 'reverse' cncts are 'incoming'

Reverse connections are incoming. This probably should have been done as
part of the client/server disambiguation CL, but doing it now.

Bug: N/A
Test: N/A
Change-Id: Ie66e92ccb826bf13094a79bfd38bff260e9c6de6
parent fba6f77a
Loading
Loading
Loading
Loading
+4 −4
Original line number Diff line number Diff line
@@ -249,7 +249,7 @@ void RpcServer::establishConnection(sp<RpcServer>&& server, base::unique_fd clie
              statusToString(status).c_str());
        // still need to cleanup before we can return
    }
    bool reverse = header.options & RPC_CONNECTION_OPTION_REVERSE;
    bool incoming = header.options & RPC_CONNECTION_OPTION_INCOMING;

    std::thread thisThread;
    sp<RpcSession> session;
@@ -274,8 +274,8 @@ void RpcServer::establishConnection(sp<RpcServer>&& server, base::unique_fd clie
        RpcAddress sessionId = RpcAddress::fromRawEmbedded(&header.sessionId);

        if (sessionId.isZero()) {
            if (reverse) {
                ALOGE("Cannot create a new session with a reverse connection, would leak");
            if (incoming) {
                ALOGE("Cannot create a new session with an incoming connection, would leak");
                return;
            }

@@ -313,7 +313,7 @@ void RpcServer::establishConnection(sp<RpcServer>&& server, base::unique_fd clie
            session = it->second;
        }

        if (reverse) {
        if (incoming) {
            LOG_ALWAYS_FATAL_IF(!session->addOutgoingConnection(std::move(clientFd), true),
                                "server state must already be initialized");
            return;
+6 −6
Original line number Diff line number Diff line
@@ -399,7 +399,7 @@ bool RpcSession::setupSocketClient(const RpcSocketAddress& addr) {
                            mOutgoingConnections.size());
    }

    if (!setupOneSocketConnection(addr, RpcAddress::zero(), false /*reverse*/)) return false;
    if (!setupOneSocketConnection(addr, RpcAddress::zero(), false /*incoming*/)) return false;

    // TODO(b/189955605): we should add additional sessions dynamically
    // instead of all at once.
@@ -420,7 +420,7 @@ bool RpcSession::setupSocketClient(const RpcSocketAddress& addr) {
    // we've already setup one client
    for (size_t i = 0; i + 1 < numThreadsAvailable; i++) {
        // TODO(b/189955605): shutdown existing connections?
        if (!setupOneSocketConnection(addr, mId.value(), false /*reverse*/)) return false;
        if (!setupOneSocketConnection(addr, mId.value(), false /*incoming*/)) return false;
    }

    // TODO(b/189955605): we should add additional sessions dynamically
@@ -430,14 +430,14 @@ bool RpcSession::setupSocketClient(const RpcSocketAddress& addr) {
    // any requests at all.

    for (size_t i = 0; i < mMaxThreads; i++) {
        if (!setupOneSocketConnection(addr, mId.value(), true /*reverse*/)) return false;
        if (!setupOneSocketConnection(addr, mId.value(), true /*incoming*/)) return false;
    }

    return true;
}

bool RpcSession::setupOneSocketConnection(const RpcSocketAddress& addr, const RpcAddress& id,
                                          bool reverse) {
                                          bool incoming) {
    for (size_t tries = 0; tries < 5; tries++) {
        if (tries > 0) usleep(10000);

@@ -464,7 +464,7 @@ bool RpcSession::setupOneSocketConnection(const RpcSocketAddress& addr, const Rp
        RpcConnectionHeader header{.options = 0};
        memcpy(&header.sessionId, &id.viewRawEmbedded(), sizeof(RpcWireAddress));

        if (reverse) header.options |= RPC_CONNECTION_OPTION_REVERSE;
        if (incoming) header.options |= RPC_CONNECTION_OPTION_INCOMING;

        if (sizeof(header) != TEMP_FAILURE_RETRY(write(serverFd.get(), &header, sizeof(header)))) {
            int savedErrno = errno;
@@ -475,7 +475,7 @@ bool RpcSession::setupOneSocketConnection(const RpcSocketAddress& addr, const Rp

        LOG_RPC_DETAIL("Socket at %s client with fd %d", addr.toString().c_str(), serverFd.get());

        if (reverse) {
        if (incoming) {
            return addIncomingConnection(std::move(serverFd));
        } else {
            return addOutgoingConnection(std::move(serverFd), true);
+2 −2
Original line number Diff line number Diff line
@@ -21,7 +21,7 @@ namespace android {
#pragma clang diagnostic error "-Wpadded"

enum : uint8_t {
    RPC_CONNECTION_OPTION_REVERSE = 0x1,
    RPC_CONNECTION_OPTION_INCOMING = 0x1, // default is outgoing
};

constexpr uint64_t RPC_WIRE_ADDRESS_OPTION_CREATED = 1 << 0; // distinguish from '0' address
@@ -47,7 +47,7 @@ struct RpcConnectionHeader {
/**
 * Whenever a client connection is setup, this is sent as the initial
 * transaction. The main use of this is in order to control the timing for when
 * a reverse connection is setup.
 * an incoming connection is setup.
 */
struct RpcOutgoingConnectionInit {
    char msg[4];
+6 −7
Original line number Diff line number Diff line
@@ -391,7 +391,7 @@ public:
    // This creates a new process serving an interface on a certain number of
    // threads.
    ProcessSession createRpcTestSocketServerProcess(
            size_t numThreads, size_t numSessions, size_t numReverseConnections,
            size_t numThreads, size_t numSessions, size_t numIncomingConnections,
            const std::function<void(const sp<RpcServer>&)>& configure) {
        CHECK_GE(numSessions, 1) << "Must have at least one session to a server";

@@ -446,7 +446,7 @@ public:

        for (size_t i = 0; i < numSessions; i++) {
            sp<RpcSession> session = RpcSession::make();
            session->setMaxThreads(numReverseConnections);
            session->setMaxThreads(numIncomingConnections);

            switch (socketType) {
                case SocketType::UNIX:
@@ -468,12 +468,11 @@ public:
        return ret;
    }

    BinderRpcTestProcessSession createRpcTestSocketServerProcess(size_t numThreads,
                                                                 size_t numSessions = 1,
                                                                 size_t numReverseConnections = 0) {
    BinderRpcTestProcessSession createRpcTestSocketServerProcess(
            size_t numThreads, size_t numSessions = 1, size_t numIncomingConnections = 0) {
        BinderRpcTestProcessSession ret{
                .proc = createRpcTestSocketServerProcess(numThreads, numSessions,
                                                         numReverseConnections,
                                                         numIncomingConnections,
                                                         [&](const sp<RpcServer>& server) {
                                                             sp<MyBinderRpcTest> service =
                                                                     new MyBinderRpcTest;
@@ -1016,7 +1015,7 @@ TEST_P(BinderRpc, Callbacks) {
                    EXPECT_EQ(DEAD_OBJECT, status.transactionError()) << status;
                }

                // since this session has a reverse connection w/ a threadpool, we
                // since this session has an incoming connection w/ a threadpool, we
                // need to manually shut it down
                EXPECT_TRUE(proc.proc.sessions.at(0).session->shutdownAndWait(true));