Donate to e Foundation | Murena handsets with /e/OS | Own a part of Murena! Learn more

Commit 936fc19a authored by Steven Moreland's avatar Steven Moreland Committed by Gerrit Code Review
Browse files

Merge changes Id38049ed,I13bc9126,I9fbc7594

* changes:
  libbinder: handle ExclusiveSocket failure
  libbinder: RPC know when connections setup
  libbinder: RPC process oneway w/ 'tail call'
parents e27baffb 195edb85
Loading
Loading
Loading
Loading
+4 −2
Original line number Diff line number Diff line
@@ -307,13 +307,15 @@ void RpcServer::establishConnection(sp<RpcServer>&& server, base::unique_fd clie
        }

        detachGuard.Disable();
        session->preJoin(std::move(thisThread));
        session->preJoinThreadOwnership(std::move(thisThread));
    }

    auto setupResult = session->preJoinSetup(std::move(clientFd));

    // avoid strong cycle
    server = nullptr;

    RpcSession::join(std::move(session), std::move(clientFd));
    RpcSession::join(std::move(session), std::move(setupResult));
}

bool RpcServer::setupSocketServer(const RpcSocketAddress& addr) {
+111 −59
Original line number Diff line number Diff line
@@ -104,12 +104,18 @@ bool RpcSession::addNullDebuggingClient() {
}

sp<IBinder> RpcSession::getRootObject() {
    ExclusiveConnection connection(sp<RpcSession>::fromExisting(this), ConnectionUse::CLIENT);
    ExclusiveConnection connection;
    status_t status = ExclusiveConnection::find(sp<RpcSession>::fromExisting(this),
                                                ConnectionUse::CLIENT, &connection);
    if (status != OK) return nullptr;
    return state()->getRootObject(connection.fd(), sp<RpcSession>::fromExisting(this));
}

status_t RpcSession::getRemoteMaxThreads(size_t* maxThreads) {
    ExclusiveConnection connection(sp<RpcSession>::fromExisting(this), ConnectionUse::CLIENT);
    ExclusiveConnection connection;
    status_t status = ExclusiveConnection::find(sp<RpcSession>::fromExisting(this),
                                                ConnectionUse::CLIENT, &connection);
    if (status != OK) return status;
    return state()->getMaxThreads(connection.fd(), sp<RpcSession>::fromExisting(this), maxThreads);
}

@@ -133,16 +139,22 @@ bool RpcSession::shutdownAndWait(bool wait) {

status_t RpcSession::transact(const sp<IBinder>& binder, uint32_t code, const Parcel& data,
                              Parcel* reply, uint32_t flags) {
    ExclusiveConnection connection(sp<RpcSession>::fromExisting(this),
    ExclusiveConnection connection;
    status_t status =
            ExclusiveConnection::find(sp<RpcSession>::fromExisting(this),
                                      (flags & IBinder::FLAG_ONEWAY) ? ConnectionUse::CLIENT_ASYNC
                                                                  : ConnectionUse::CLIENT);
                                                                     : ConnectionUse::CLIENT,
                                      &connection);
    if (status != OK) return status;
    return state()->transact(connection.fd(), binder, code, data,
                             sp<RpcSession>::fromExisting(this), reply, flags);
}

status_t RpcSession::sendDecStrong(const RpcAddress& address) {
    ExclusiveConnection connection(sp<RpcSession>::fromExisting(this),
                                   ConnectionUse::CLIENT_REFCOUNT);
    ExclusiveConnection connection;
    status_t status = ExclusiveConnection::find(sp<RpcSession>::fromExisting(this),
                                                ConnectionUse::CLIENT_REFCOUNT, &connection);
    if (status != OK) return status;
    return state()->sendDecStrong(connection.fd(), sp<RpcSession>::fromExisting(this), address);
}

@@ -208,9 +220,12 @@ status_t RpcSession::readId() {

    int32_t id;

    ExclusiveConnection connection(sp<RpcSession>::fromExisting(this), ConnectionUse::CLIENT);
    status_t status =
            state()->getSessionId(connection.fd(), sp<RpcSession>::fromExisting(this), &id);
    ExclusiveConnection connection;
    status_t status = ExclusiveConnection::find(sp<RpcSession>::fromExisting(this),
                                                ConnectionUse::CLIENT, &connection);
    if (status != OK) return status;

    status = state()->getSessionId(connection.fd(), sp<RpcSession>::fromExisting(this), &id);
    if (status != OK) return status;

    LOG_RPC_DETAIL("RpcSession %p has id %d", this, id);
@@ -236,7 +251,7 @@ void RpcSession::WaitForShutdownListener::waitForShutdown(std::unique_lock<std::
    }
}

void RpcSession::preJoin(std::thread thread) {
void RpcSession::preJoinThreadOwnership(std::thread thread) {
    LOG_ALWAYS_FATAL_IF(thread.get_id() != std::this_thread::get_id(), "Must own this thread");

    {
@@ -245,21 +260,37 @@ void RpcSession::preJoin(std::thread thread) {
    }
}

void RpcSession::join(sp<RpcSession>&& session, unique_fd client) {
RpcSession::PreJoinSetupResult RpcSession::preJoinSetup(base::unique_fd fd) {
    // must be registered to allow arbitrary client code executing commands to
    // be able to do nested calls (we can't only read from it)
    sp<RpcConnection> connection = session->assignServerToThisThread(std::move(client));
    sp<RpcConnection> connection = assignServerToThisThread(std::move(fd));

    status_t status =
            mState->readConnectionInit(connection->fd, sp<RpcSession>::fromExisting(this));

    return PreJoinSetupResult{
            .connection = std::move(connection),
            .status = status,
    };
}

void RpcSession::join(sp<RpcSession>&& session, PreJoinSetupResult&& setupResult) {
    sp<RpcConnection>& connection = setupResult.connection;

    if (setupResult.status == OK) {
        while (true) {
        status_t error = session->state()->getAndExecuteCommand(connection->fd, session,
            status_t status = session->state()->getAndExecuteCommand(connection->fd, session,
                                                                     RpcState::CommandType::ANY);

        if (error != OK) {
            if (status != OK) {
                LOG_RPC_DETAIL("Binder connection thread closing w/ status %s",
                           statusToString(error).c_str());
                               statusToString(status).c_str());
                break;
            }
        }
    } else {
        ALOGE("Connection failed to init, closing with status %s",
              statusToString(setupResult.status).c_str());
    }

    LOG_ALWAYS_FATAL_IF(!session->removeServerConnection(connection),
                        "bad state: connection object guaranteed to be in list");
@@ -381,14 +412,17 @@ bool RpcSession::setupOneSocketConnection(const RpcSocketAddress& addr, int32_t
                unique_fd fd = std::move(serverFd);
                // NOLINTNEXTLINE(performance-unnecessary-copy-initialization)
                sp<RpcSession> session = thiz;
                session->preJoin(std::move(thread));
                ownershipTransferred = true;
                joinCv.notify_one();
                session->preJoinThreadOwnership(std::move(thread));

                // only continue once we have a response or the connection fails
                auto setupResult = session->preJoinSetup(std::move(fd));

                ownershipTransferred = true;
                threadLock.unlock();
                joinCv.notify_one();
                // do not use & vars below

                RpcSession::join(std::move(session), std::move(fd));
                RpcSession::join(std::move(session), std::move(setupResult));
            });
            joinCv.wait(lock, [&] { return ownershipTransferred; });
            LOG_ALWAYS_FATAL_IF(!ownershipTransferred);
@@ -403,6 +437,8 @@ bool RpcSession::setupOneSocketConnection(const RpcSocketAddress& addr, int32_t
}

bool RpcSession::addClientConnection(unique_fd fd) {
    sp<RpcConnection> connection = sp<RpcConnection>::make();
    {
        std::lock_guard<std::mutex> _l(mMutex);

        // first client connection added, but setForServer not called, so
@@ -413,10 +449,20 @@ bool RpcSession::addClientConnection(unique_fd fd) {
            if (mShutdownTrigger == nullptr) return false;
        }

    sp<RpcConnection> session = sp<RpcConnection>::make();
    session->fd = std::move(fd);
    mClientConnections.push_back(session);
    return true;
        connection->fd = std::move(fd);
        connection->exclusiveTid = gettid();
        mClientConnections.push_back(connection);
    }

    status_t status =
            mState->sendConnectionInit(connection->fd, sp<RpcSession>::fromExisting(this));

    {
        std::lock_guard<std::mutex> _l(mMutex);
        connection->exclusiveTid = std::nullopt;
    }

    return status == OK;
}

bool RpcSession::setForServer(const wp<RpcServer>& server, const wp<EventListener>& eventListener,
@@ -462,13 +508,16 @@ bool RpcSession::removeServerConnection(const sp<RpcConnection>& connection) {
    return false;
}

RpcSession::ExclusiveConnection::ExclusiveConnection(const sp<RpcSession>& session,
                                                     ConnectionUse use)
      : mSession(session) {
status_t RpcSession::ExclusiveConnection::find(const sp<RpcSession>& session, ConnectionUse use,
                                               ExclusiveConnection* connection) {
    connection->mSession = session;
    connection->mConnection = nullptr;
    connection->mReentrant = false;

    pid_t tid = gettid();
    std::unique_lock<std::mutex> _l(mSession->mMutex);
    std::unique_lock<std::mutex> _l(session->mMutex);

    mSession->mWaitingThreads++;
    session->mWaitingThreads++;
    while (true) {
        sp<RpcConnection> exclusive;
        sp<RpcConnection> available;
@@ -476,8 +525,8 @@ RpcSession::ExclusiveConnection::ExclusiveConnection(const sp<RpcSession>& sessi
        // CHECK FOR DEDICATED CLIENT SOCKET
        //
        // A server/looper should always use a dedicated connection if available
        findConnection(tid, &exclusive, &available, mSession->mClientConnections,
                       mSession->mClientConnectionsOffset);
        findConnection(tid, &exclusive, &available, session->mClientConnections,
                       session->mClientConnectionsOffset);

        // WARNING: this assumes a server cannot request its client to send
        // a transaction, as mServerConnections is excluded below.
@@ -490,8 +539,8 @@ RpcSession::ExclusiveConnection::ExclusiveConnection(const sp<RpcSession>& sessi
        // command. So, we move to considering the second available thread
        // for subsequent calls.
        if (use == ConnectionUse::CLIENT_ASYNC && (exclusive != nullptr || available != nullptr)) {
            mSession->mClientConnectionsOffset =
                    (mSession->mClientConnectionsOffset + 1) % mSession->mClientConnections.size();
            session->mClientConnectionsOffset =
                    (session->mClientConnectionsOffset + 1) % session->mClientConnections.size();
        }

        // USE SERVING SOCKET (for nested transaction)
@@ -499,33 +548,36 @@ RpcSession::ExclusiveConnection::ExclusiveConnection(const sp<RpcSession>& sessi
        // asynchronous calls cannot be nested
        if (use != ConnectionUse::CLIENT_ASYNC) {
            // server connections are always assigned to a thread
            findConnection(tid, &exclusive, nullptr /*available*/, mSession->mServerConnections,
            findConnection(tid, &exclusive, nullptr /*available*/, session->mServerConnections,
                           0 /* index hint */);
        }

        // if our thread is already using a connection, prioritize using that
        if (exclusive != nullptr) {
            mConnection = exclusive;
            mReentrant = true;
            connection->mConnection = exclusive;
            connection->mReentrant = true;
            break;
        } else if (available != nullptr) {
            mConnection = available;
            mConnection->exclusiveTid = tid;
            connection->mConnection = available;
            connection->mConnection->exclusiveTid = tid;
            break;
        }

        // TODO(b/185167543): this should return an error, rather than crash a
        // server
        // in regular binder, this would usually be a deadlock :)
        LOG_ALWAYS_FATAL_IF(mSession->mClientConnections.size() == 0,
                            "Session has no client connections. This is required for an RPC server "
                            "to make any non-nested (e.g. oneway or on another thread) calls.");
        if (session->mClientConnections.size() == 0) {
            ALOGE("Session has no client connections. This is required for an RPC server to make "
                  "any non-nested (e.g. oneway or on another thread) calls. Use: %d. Server "
                  "connections: %zu",
                  static_cast<int>(use), session->mServerConnections.size());
            return WOULD_BLOCK;
        }

        LOG_RPC_DETAIL("No available connections (have %zu clients and %zu servers). Waiting...",
                       mSession->mClientConnections.size(), mSession->mServerConnections.size());
        mSession->mAvailableConnectionCv.wait(_l);
                       session->mClientConnections.size(), session->mServerConnections.size());
        session->mAvailableConnectionCv.wait(_l);
    }
    mSession->mWaitingThreads--;
    session->mWaitingThreads--;

    return OK;
}

void RpcSession::ExclusiveConnection::findConnection(pid_t tid, sp<RpcConnection>* exclusive,
@@ -559,7 +611,7 @@ RpcSession::ExclusiveConnection::~ExclusiveConnection() {
    // reentrant use of a connection means something less deep in the call stack
    // is using this fd, and it retains the right to it. So, we don't give up
    // exclusive ownership, and no thread is freed.
    if (!mReentrant) {
    if (!mReentrant && mConnection != nullptr) {
        std::unique_lock<std::mutex> _l(mSession->mMutex);
        mConnection->exclusiveTid = std::nullopt;
        if (mSession->mWaitingThreads > 0) {
+33 −7
Original line number Diff line number Diff line
@@ -265,6 +265,27 @@ status_t RpcState::rpcRec(const base::unique_fd& fd, const sp<RpcSession>& sessi
    return OK;
}

status_t RpcState::sendConnectionInit(const base::unique_fd& fd, const sp<RpcSession>& session) {
    RpcClientConnectionInit init{
            .msg = RPC_CONNECTION_INIT_OKAY,
    };
    return rpcSend(fd, session, "connection init", &init, sizeof(init));
}

status_t RpcState::readConnectionInit(const base::unique_fd& fd, const sp<RpcSession>& session) {
    RpcClientConnectionInit init;
    if (status_t status = rpcRec(fd, session, "connection init", &init, sizeof(init)); status != OK)
        return status;

    static_assert(sizeof(init.msg) == sizeof(RPC_CONNECTION_INIT_OKAY));
    if (0 != strncmp(init.msg, RPC_CONNECTION_INIT_OKAY, sizeof(init.msg))) {
        ALOGE("Connection init message unrecognized %.*s", static_cast<int>(sizeof(init.msg)),
              init.msg);
        return BAD_VALUE;
    }
    return OK;
}

sp<IBinder> RpcState::getRootObject(const base::unique_fd& fd, const sp<RpcSession>& session) {
    Parcel data;
    data.markForRpc(session);
@@ -565,7 +586,7 @@ status_t RpcState::processTransact(const base::unique_fd& fd, const sp<RpcSessio
        status != OK)
        return status;

    return processTransactInternal(fd, session, std::move(transactionData), nullptr /*targetRef*/);
    return processTransactInternal(fd, session, std::move(transactionData));
}

static void do_nothing_to_transact_data(Parcel* p, const uint8_t* data, size_t dataSize,
@@ -578,7 +599,13 @@ static void do_nothing_to_transact_data(Parcel* p, const uint8_t* data, size_t d
}

status_t RpcState::processTransactInternal(const base::unique_fd& fd, const sp<RpcSession>& session,
                                           CommandData transactionData, sp<IBinder>&& targetRef) {
                                           CommandData transactionData) {
    // for 'recursive' calls to this, we have already read and processed the
    // binder from the transaction data and taken reference counts into account,
    // so it is cached here.
    sp<IBinder> targetRef;
processTransactInternalTailCall:

    if (transactionData.size() < sizeof(RpcWireTransaction)) {
        ALOGE("Expecting %zu but got %zu bytes for RpcWireTransaction. Terminating!",
              sizeof(RpcWireTransaction), transactionData.size());
@@ -751,13 +778,12 @@ status_t RpcState::processTransactInternal(const base::unique_fd& fd, const sp<R
                // - gotta go fast
                auto& todo = const_cast<BinderNode::AsyncTodo&>(it->second.asyncTodo.top());

                CommandData nextData = std::move(todo.data);
                sp<IBinder> nextRef = std::move(todo.ref);
                // reset up arguments
                transactionData = std::move(todo.data);
                targetRef = std::move(todo.ref);

                it->second.asyncTodo.pop();
                _l.unlock();
                return processTransactInternal(fd, session, std::move(nextData),
                                               std::move(nextRef));
                goto processTransactInternalTailCall;
            }
        }
        return OK;
+4 −2
Original line number Diff line number Diff line
@@ -51,6 +51,9 @@ public:
    RpcState();
    ~RpcState();

    status_t sendConnectionInit(const base::unique_fd& fd, const sp<RpcSession>& session);
    status_t readConnectionInit(const base::unique_fd& fd, const sp<RpcSession>& session);

    // TODO(b/182940634): combine some special transactions into one "getServerInfo" call?
    sp<IBinder> getRootObject(const base::unique_fd& fd, const sp<RpcSession>& session);
    status_t getMaxThreads(const base::unique_fd& fd, const sp<RpcSession>& session,
@@ -144,8 +147,7 @@ private:
                                           const RpcWireHeader& command);
    [[nodiscard]] status_t processTransactInternal(const base::unique_fd& fd,
                                                   const sp<RpcSession>& session,
                                                   CommandData transactionData,
                                                   sp<IBinder>&& targetRef);
                                                   CommandData transactionData);
    [[nodiscard]] status_t processDecStrong(const base::unique_fd& fd,
                                            const sp<RpcSession>& session,
                                            const RpcWireHeader& command);
+16 −0
Original line number Diff line number Diff line
@@ -26,12 +26,28 @@ enum : uint8_t {
    RPC_CONNECTION_OPTION_REVERSE = 0x1,
};

/**
 * This is sent to an RpcServer in order to request a new connection is created,
 * either as part of a new session or an existing session
 */
struct RpcConnectionHeader {
    int32_t sessionId;
    uint8_t options;
    uint8_t reserved[3];
};

#define RPC_CONNECTION_INIT_OKAY "cci"

/**
 * Whenever a client connection is setup, this is sent as the initial
 * transaction. The main use of this is in order to control the timing for when
 * a reverse connection is setup.
 */
struct RpcClientConnectionInit {
    char msg[4];
    uint8_t reserved[4];
};

enum : uint32_t {
    /**
     * follows is RpcWireTransaction, if flags != oneway, reply w/ RPC_COMMAND_REPLY expected
Loading