Donate to e Foundation | Murena handsets with /e/OS | Own a part of Murena! Learn more

Commit da5e6078 authored by Arthur Ishiguro's avatar Arthur Ishiguro
Browse files

Refactors client acceptance logic in RpcServer::join

Refactors logic in RpcServer::join to another function to prepare for sharing the logic with a non-blocking version (to be added in a follow-up CL).

Bug: 412692091
Flag: EXEMPT minor bug fix
Test: atest binderRpcTest pass
Change-Id: I3d165ae7a8e084a451f8f4dd2eb2412ae56b6e98
parent 73d3644e
Loading
Loading
Loading
Loading
+43 −37
Original line number Diff line number Diff line
@@ -273,56 +273,62 @@ void RpcServer::join() {

    status_t status;
    while ((status = mShutdownTrigger->triggerablePoll(mServer, POLLIN)) == OK) {
        status = acceptConnection(RpcSession::join);
        if (status == DEAD_OBJECT) {
            break;
        }
    }
    LOG_RPC_DETAIL("RpcServer::join exiting with %s", statusToString(status).c_str());

    if constexpr (kEnableRpcThreads) {
        RpcMutexLockGuard _l(mLock);
        mJoinThreadRunning = false;
    } else {
        // Multi-threaded builds clear this in shutdown(), but we need it valid
        // so the loop above exits cleanly
        mShutdownTrigger = nullptr;
    }
    mShutdownCv.notify_all();
}

status_t RpcServer::acceptConnection(
        std::function<void(sp<RpcSession>&&, RpcSession::PreJoinSetupResult&&)>&& joinFn) {
    RpcTransportFd clientFd;
    std::array<uint8_t, kRpcAddressSize> addr;
    static_assert(addr.size() >= sizeof(sockaddr_storage), "kRpcAddressSize is too small");
    socklen_t addrLen = addr.size();

        RpcTransportFd clientSocket;
        if ((status = mAcceptFn(*this, &clientSocket)) != OK) {
            if (status == DEAD_OBJECT) {
                break;
            } else {
    status_t status;
    if ((status = mAcceptFn(*this, &clientFd)) != OK) {
        if (status != DEAD_OBJECT) {
            ALOGE("Accept returned error %s", statusToString(status).c_str());
                continue;
        }
        return status;
    }

        LOG_RPC_DETAIL("accept on fd %d yields fd %d", mServer.fd.get(), clientSocket.fd.get());
    LOG_RPC_DETAIL("accept on fd %d yields fd %d", mServer.fd.get(), clientFd.fd.get());

        if (getpeername(clientSocket.fd.get(), reinterpret_cast<sockaddr*>(addr.data()),
                        &addrLen)) {
    if (getpeername(clientFd.fd.get(), reinterpret_cast<sockaddr*>(addr.data()), &addrLen)) {
        ALOGE("Could not getpeername socket: %s", strerror(errno));
            continue;
        return EINVAL;
    }

    if (mConnectionFilter != nullptr && !mConnectionFilter(addr.data(), addrLen)) {
            ALOGE("Dropped client connection fd %d", clientSocket.fd.get());
            continue;
        ALOGE("Dropped client connection fd %d", clientFd.fd.get());
        return EINVAL;
    }

    {
        RpcMutexLockGuard _l(mLock);
        RpcMaybeThread thread =
                    RpcMaybeThread(&RpcServer::establishConnection,
                                   sp<RpcServer>::fromExisting(this), std::move(clientSocket), addr,
                                   addrLen, RpcSession::join);

                RpcMaybeThread(&RpcServer::establishConnection, sp<RpcServer>::fromExisting(this),
                               std::move(clientFd), addr, addrLen, std::move(joinFn));
        auto& threadRef = mConnectingThreads[thread.get_id()];
        threadRef = std::move(thread);
        rpcJoinIfSingleThreaded(threadRef);
    }
    }
    LOG_RPC_DETAIL("RpcServer::join exiting with %s", statusToString(status).c_str());

    if constexpr (kEnableRpcThreads) {
        RpcMutexLockGuard _l(mLock);
        mJoinThreadRunning = false;
    } else {
        // Multi-threaded builds clear this in shutdown(), but we need it valid
        // so the loop above exits cleanly
        mShutdownTrigger = nullptr;
    }
    mShutdownCv.notify_all();
    return OK;
}

bool RpcServer::shutdown() {
+2 −0
Original line number Diff line number Diff line
@@ -268,6 +268,8 @@ private:
    static status_t recvmsgSocketConnection(const RpcServer& server, RpcTransportFd* out);

    [[nodiscard]] status_t setupSocketServer(const RpcSocketAddress& address);
    [[nodiscard]] status_t acceptConnection(
            std::function<void(sp<RpcSession>&&, RpcSession::PreJoinSetupResult&&)>&& joinFn);

    const std::unique_ptr<RpcTransportCtx> mCtx;
    size_t mMaxThreads = 1;