Donate to e Foundation | Murena handsets with /e/OS | Own a part of Murena! Learn more

Commit c32aeb35 authored by Arthur Ishiguro's avatar Arthur Ishiguro
Browse files

Revert "Revert "Refactors client acceptance logic in RpcServer::..."

Revert submission 34292148-revert-34231615-LLUPMXXFBH

Reason for revert: Attempt to reland CLs after fixing the issue
Bug: 410035138

Reverted changes: /q/submissionid:34292148-revert-34231615-LLUPMXXFBH

Change-Id: Ib2b712af289c0da7e9bb5fb04ca435aa232419a3
parent 7f87523d
Loading
Loading
Loading
Loading
+43 −37
Original line number Original line Diff line number Diff line
@@ -273,56 +273,62 @@ void RpcServer::join() {


    status_t status;
    status_t status;
    while ((status = mShutdownTrigger->triggerablePoll(mServer, POLLIN)) == OK) {
    while ((status = mShutdownTrigger->triggerablePoll(mServer, POLLIN)) == OK) {
        status = acceptConnection(RpcSession::join);
        if (status == DEAD_OBJECT) {
            break;
        }
    }
    LOG_RPC_DETAIL("RpcServer::join exiting with %s", statusToString(status).c_str());

    if constexpr (kEnableRpcThreads) {
        RpcMutexLockGuard _l(mLock);
        mJoinThreadRunning = false;
    } else {
        // Multi-threaded builds clear this in shutdown(), but we need it valid
        // so the loop above exits cleanly
        mShutdownTrigger = nullptr;
    }
    mShutdownCv.notify_all();
}

status_t RpcServer::acceptConnection(
        std::function<void(sp<RpcSession>&&, RpcSession::PreJoinSetupResult&&)>&& joinFn) {
    RpcTransportFd clientFd;
    std::array<uint8_t, kRpcAddressSize> addr;
    std::array<uint8_t, kRpcAddressSize> addr;
    static_assert(addr.size() >= sizeof(sockaddr_storage), "kRpcAddressSize is too small");
    static_assert(addr.size() >= sizeof(sockaddr_storage), "kRpcAddressSize is too small");
    socklen_t addrLen = addr.size();
    socklen_t addrLen = addr.size();


        RpcTransportFd clientSocket;
    status_t status;
        if ((status = mAcceptFn(*this, &clientSocket)) != OK) {
    if ((status = mAcceptFn(*this, &clientFd)) != OK) {
            if (status == DEAD_OBJECT) {
        if (status != DEAD_OBJECT) {
                break;
            } else {
            ALOGE("Accept returned error %s", statusToString(status).c_str());
            ALOGE("Accept returned error %s", statusToString(status).c_str());
                continue;
        }
        }
        return status;
    }
    }


        LOG_RPC_DETAIL("accept on fd %d yields fd %d", mServer.fd.get(), clientSocket.fd.get());
    LOG_RPC_DETAIL("accept on fd %d yields fd %d", mServer.fd.get(), clientFd.fd.get());


        if (getpeername(clientSocket.fd.get(), reinterpret_cast<sockaddr*>(addr.data()),
    if (getpeername(clientFd.fd.get(), reinterpret_cast<sockaddr*>(addr.data()), &addrLen)) {
                        &addrLen)) {
        ALOGE("Could not getpeername socket: %s", strerror(errno));
        ALOGE("Could not getpeername socket: %s", strerror(errno));
            continue;
        return EINVAL;
    }
    }


    if (mConnectionFilter != nullptr && !mConnectionFilter(addr.data(), addrLen)) {
    if (mConnectionFilter != nullptr && !mConnectionFilter(addr.data(), addrLen)) {
            ALOGE("Dropped client connection fd %d", clientSocket.fd.get());
        ALOGE("Dropped client connection fd %d", clientFd.fd.get());
            continue;
        return EINVAL;
    }
    }


    {
    {
        RpcMutexLockGuard _l(mLock);
        RpcMutexLockGuard _l(mLock);
        RpcMaybeThread thread =
        RpcMaybeThread thread =
                    RpcMaybeThread(&RpcServer::establishConnection,
                RpcMaybeThread(&RpcServer::establishConnection, sp<RpcServer>::fromExisting(this),
                                   sp<RpcServer>::fromExisting(this), std::move(clientSocket), addr,
                               std::move(clientFd), addr, addrLen, std::move(joinFn));
                                   addrLen, RpcSession::join);

        auto& threadRef = mConnectingThreads[thread.get_id()];
        auto& threadRef = mConnectingThreads[thread.get_id()];
        threadRef = std::move(thread);
        threadRef = std::move(thread);
        rpcJoinIfSingleThreaded(threadRef);
        rpcJoinIfSingleThreaded(threadRef);
    }
    }
    }
    LOG_RPC_DETAIL("RpcServer::join exiting with %s", statusToString(status).c_str());


    if constexpr (kEnableRpcThreads) {
    return OK;
        RpcMutexLockGuard _l(mLock);
        mJoinThreadRunning = false;
    } else {
        // Multi-threaded builds clear this in shutdown(), but we need it valid
        // so the loop above exits cleanly
        mShutdownTrigger = nullptr;
    }
    mShutdownCv.notify_all();
}
}


bool RpcServer::shutdown() {
bool RpcServer::shutdown() {
+2 −0
Original line number Original line Diff line number Diff line
@@ -268,6 +268,8 @@ private:
    static status_t recvmsgSocketConnection(const RpcServer& server, RpcTransportFd* out);
    static status_t recvmsgSocketConnection(const RpcServer& server, RpcTransportFd* out);


    [[nodiscard]] status_t setupSocketServer(const RpcSocketAddress& address);
    [[nodiscard]] status_t setupSocketServer(const RpcSocketAddress& address);
    [[nodiscard]] status_t acceptConnection(
            std::function<void(sp<RpcSession>&&, RpcSession::PreJoinSetupResult&&)>&& joinFn);


    const std::unique_ptr<RpcTransportCtx> mCtx;
    const std::unique_ptr<RpcTransportCtx> mCtx;
    size_t mMaxThreads = 1;
    size_t mMaxThreads = 1;