Donate to e Foundation | Murena handsets with /e/OS | Own a part of Murena! Learn more

Commit c88b7fcc authored by Steven Moreland's avatar Steven Moreland
Browse files

libbinder: RPC know when connections setup

Previously, there was a race where:
a. client creates connection to server
b. client sends request for reverse connection to server (but this
  may still be traveling on the wire)
c. client sends transaction to server
d. server tries to make a callback
e. server fails to make callback because no reverse connection is setup

Now, when a new connection is setup, a header on this connection is
setup. So, we can wait on this header to be received in (b).

Note: currently, (e) results in an abort, this is tracked in b/167966510
with a TODO in the ExclusiveConnection code. This would make a less
obvious flake (or perhaps the problem would be ignored), but this race
still needs to be fixed for well-behaved clients to be able to function
reliably.

Fixes: 190639665
Test: binderRpcTest (callback test 10,000s of times)
Change-Id: I13bc912692d63ea73d46c5441fa7d51121df2f58
parent ada72bd2
Loading
Loading
Loading
Loading
+4 −2
Original line number Diff line number Diff line
@@ -307,13 +307,15 @@ void RpcServer::establishConnection(sp<RpcServer>&& server, base::unique_fd clie
        }

        detachGuard.Disable();
        session->preJoin(std::move(thisThread));
        session->preJoinThreadOwnership(std::move(thisThread));
    }

    auto setupResult = session->preJoinSetup(std::move(clientFd));

    // avoid strong cycle
    server = nullptr;

    RpcSession::join(std::move(session), std::move(clientFd));
    RpcSession::join(std::move(session), std::move(setupResult));
}

bool RpcServer::setupSocketServer(const RpcSocketAddress& addr) {
+59 −26
Original line number Diff line number Diff line
@@ -236,7 +236,7 @@ void RpcSession::WaitForShutdownListener::waitForShutdown(std::unique_lock<std::
    }
}

void RpcSession::preJoin(std::thread thread) {
void RpcSession::preJoinThreadOwnership(std::thread thread) {
    LOG_ALWAYS_FATAL_IF(thread.get_id() != std::this_thread::get_id(), "Must own this thread");

    {
@@ -245,21 +245,37 @@ void RpcSession::preJoin(std::thread thread) {
    }
}

void RpcSession::join(sp<RpcSession>&& session, unique_fd client) {
RpcSession::PreJoinSetupResult RpcSession::preJoinSetup(base::unique_fd fd) {
    // must be registered to allow arbitrary client code executing commands to
    // be able to do nested calls (we can't only read from it)
    sp<RpcConnection> connection = session->assignServerToThisThread(std::move(client));
    sp<RpcConnection> connection = assignServerToThisThread(std::move(fd));

    status_t status =
            mState->readConnectionInit(connection->fd, sp<RpcSession>::fromExisting(this));

    return PreJoinSetupResult{
            .connection = std::move(connection),
            .status = status,
    };
}

void RpcSession::join(sp<RpcSession>&& session, PreJoinSetupResult&& setupResult) {
    sp<RpcConnection>& connection = setupResult.connection;

    if (setupResult.status == OK) {
        while (true) {
        status_t error = session->state()->getAndExecuteCommand(connection->fd, session,
            status_t status = session->state()->getAndExecuteCommand(connection->fd, session,
                                                                     RpcState::CommandType::ANY);

        if (error != OK) {
            if (status != OK) {
                LOG_RPC_DETAIL("Binder connection thread closing w/ status %s",
                           statusToString(error).c_str());
                               statusToString(status).c_str());
                break;
            }
        }
    } else {
        ALOGE("Connection failed to init, closing with status %s",
              statusToString(setupResult.status).c_str());
    }

    LOG_ALWAYS_FATAL_IF(!session->removeServerConnection(connection),
                        "bad state: connection object guaranteed to be in list");
@@ -381,14 +397,17 @@ bool RpcSession::setupOneSocketConnection(const RpcSocketAddress& addr, int32_t
                unique_fd fd = std::move(serverFd);
                // NOLINTNEXTLINE(performance-unnecessary-copy-initialization)
                sp<RpcSession> session = thiz;
                session->preJoin(std::move(thread));
                ownershipTransferred = true;
                joinCv.notify_one();
                session->preJoinThreadOwnership(std::move(thread));

                // only continue once we have a response or the connection fails
                auto setupResult = session->preJoinSetup(std::move(fd));

                ownershipTransferred = true;
                threadLock.unlock();
                joinCv.notify_one();
                // do not use & vars below

                RpcSession::join(std::move(session), std::move(fd));
                RpcSession::join(std::move(session), std::move(setupResult));
            });
            joinCv.wait(lock, [&] { return ownershipTransferred; });
            LOG_ALWAYS_FATAL_IF(!ownershipTransferred);
@@ -403,6 +422,8 @@ bool RpcSession::setupOneSocketConnection(const RpcSocketAddress& addr, int32_t
}

bool RpcSession::addClientConnection(unique_fd fd) {
    sp<RpcConnection> connection = sp<RpcConnection>::make();
    {
        std::lock_guard<std::mutex> _l(mMutex);

        // first client connection added, but setForServer not called, so
@@ -413,10 +434,20 @@ bool RpcSession::addClientConnection(unique_fd fd) {
            if (mShutdownTrigger == nullptr) return false;
        }

    sp<RpcConnection> session = sp<RpcConnection>::make();
    session->fd = std::move(fd);
    mClientConnections.push_back(session);
    return true;
        connection->fd = std::move(fd);
        connection->exclusiveTid = gettid();
        mClientConnections.push_back(connection);
    }

    status_t status =
            mState->sendConnectionInit(connection->fd, sp<RpcSession>::fromExisting(this));

    {
        std::lock_guard<std::mutex> _l(mMutex);
        connection->exclusiveTid = std::nullopt;
    }

    return status == OK;
}

bool RpcSession::setForServer(const wp<RpcServer>& server, const wp<EventListener>& eventListener,
@@ -519,7 +550,9 @@ RpcSession::ExclusiveConnection::ExclusiveConnection(const sp<RpcSession>& sessi
        // in regular binder, this would usually be a deadlock :)
        LOG_ALWAYS_FATAL_IF(mSession->mClientConnections.size() == 0,
                            "Session has no client connections. This is required for an RPC server "
                            "to make any non-nested (e.g. oneway or on another thread) calls.");
                            "to make any non-nested (e.g. oneway or on another thread) calls. "
                            "Use: %d. Server connections: %zu",
                            static_cast<int>(use), mSession->mServerConnections.size());

        LOG_RPC_DETAIL("No available connections (have %zu clients and %zu servers). Waiting...",
                       mSession->mClientConnections.size(), mSession->mServerConnections.size());
+21 −0
Original line number Diff line number Diff line
@@ -265,6 +265,27 @@ status_t RpcState::rpcRec(const base::unique_fd& fd, const sp<RpcSession>& sessi
    return OK;
}

status_t RpcState::sendConnectionInit(const base::unique_fd& fd, const sp<RpcSession>& session) {
    RpcClientConnectionInit init{
            .msg = RPC_CONNECTION_INIT_OKAY,
    };
    return rpcSend(fd, session, "connection init", &init, sizeof(init));
}

status_t RpcState::readConnectionInit(const base::unique_fd& fd, const sp<RpcSession>& session) {
    RpcClientConnectionInit init;
    if (status_t status = rpcRec(fd, session, "connection init", &init, sizeof(init)); status != OK)
        return status;

    static_assert(sizeof(init.msg) == sizeof(RPC_CONNECTION_INIT_OKAY));
    if (0 != strncmp(init.msg, RPC_CONNECTION_INIT_OKAY, sizeof(init.msg))) {
        ALOGE("Connection init message unrecognized %.*s", static_cast<int>(sizeof(init.msg)),
              init.msg);
        return BAD_VALUE;
    }
    return OK;
}

sp<IBinder> RpcState::getRootObject(const base::unique_fd& fd, const sp<RpcSession>& session) {
    Parcel data;
    data.markForRpc(session);
+3 −0
Original line number Diff line number Diff line
@@ -51,6 +51,9 @@ public:
    RpcState();
    ~RpcState();

    status_t sendConnectionInit(const base::unique_fd& fd, const sp<RpcSession>& session);
    status_t readConnectionInit(const base::unique_fd& fd, const sp<RpcSession>& session);

    // TODO(b/182940634): combine some special transactions into one "getServerInfo" call?
    sp<IBinder> getRootObject(const base::unique_fd& fd, const sp<RpcSession>& session);
    status_t getMaxThreads(const base::unique_fd& fd, const sp<RpcSession>& session,
+16 −0
Original line number Diff line number Diff line
@@ -26,12 +26,28 @@ enum : uint8_t {
    RPC_CONNECTION_OPTION_REVERSE = 0x1,
};

/**
 * This is sent to an RpcServer in order to request a new connection is created,
 * either as part of a new session or an existing session
 */
struct RpcConnectionHeader {
    int32_t sessionId;
    uint8_t options;
    uint8_t reserved[3];
};

#define RPC_CONNECTION_INIT_OKAY "cci"

/**
 * Whenever a client connection is setup, this is sent as the initial
 * transaction. The main use of this is in order to control the timing for when
 * a reverse connection is setup.
 */
struct RpcClientConnectionInit {
    char msg[4];
    uint8_t reserved[4];
};

enum : uint32_t {
    /**
     * follows is RpcWireTransaction, if flags != oneway, reply w/ RPC_COMMAND_REPLY expected
Loading