Donate to e Foundation | Murena handsets with /e/OS | Own a part of Murena! Learn more

Commit 1f44f98d authored by Yifan Hong's avatar Yifan Hong
Browse files

binder: RpcSession limit outgoing threads

Similar to the number of incoming threads,
the number of outgoing threads can be limited via
RpcSession::setMaxOutgoingThreads(). If set, only
min(maxOutgoingThreads, remoteMaxThreads) outgoing threads
are instantiated.

Test: binderRpcTest
Bug: 194225767

Change-Id: I15686bae4317d0ced5af999f3a3d21f9a03037e1
parent 8347ab59
Loading
Loading
Loading
Loading
+24 −1
Original line number Diff line number Diff line
@@ -90,6 +90,20 @@ size_t RpcSession::getMaxIncomingThreads() {
    return mMaxIncomingThreads;
}

void RpcSession::setMaxOutgoingThreads(size_t threads) {
    std::lock_guard<std::mutex> _l(mMutex);
    LOG_ALWAYS_FATAL_IF(!mConnections.mOutgoing.empty() || !mConnections.mIncoming.empty(),
                        "Must set max outgoing threads before setting up connections, but has %zu "
                        "client(s) and %zu server(s)",
                        mConnections.mOutgoing.size(), mConnections.mIncoming.size());
    mMaxOutgoingThreads = threads;
}

size_t RpcSession::getMaxOutgoingThreads() {
    std::lock_guard<std::mutex> _l(mMutex);
    return mMaxOutgoingThreads;
}

bool RpcSession::setProtocolVersion(uint32_t version) {
    if (version >= RPC_WIRE_PROTOCOL_VERSION_NEXT &&
        version != RPC_WIRE_PROTOCOL_VERSION_EXPERIMENTAL) {
@@ -473,6 +487,12 @@ status_t RpcSession::setupClient(const std::function<status_t(const std::vector<
        return status;
    }

    size_t outgoingThreads = std::min(numThreadsAvailable, mMaxOutgoingThreads);
    ALOGI_IF(outgoingThreads != numThreadsAvailable,
             "Server hints client to start %zu outgoing threads, but client will only start %zu "
             "because it is preconfigured to start at most %zu outgoing threads.",
             numThreadsAvailable, outgoingThreads, mMaxOutgoingThreads);

    // TODO(b/189955605): we should add additional sessions dynamically
    // instead of all at once - the other side should be responsible for setting
    // up additional connections. We need to create at least one (unless 0 are
@@ -480,7 +500,10 @@ status_t RpcSession::setupClient(const std::function<status_t(const std::vector<
    // any requests at all.

    // we've already setup one client
    for (size_t i = 0; i + 1 < numThreadsAvailable; i++) {
    LOG_RPC_DETAIL("RpcSession::setupClient() instantiating %zu outgoing (server max: %zu) and %zu "
                   "incoming threads",
                   outgoingThreads, numThreadsAvailable, mMaxIncomingThreads);
    for (size_t i = 0; i + 1 < outgoingThreads; i++) {
        if (status_t status = connectAndInit(mId, false /*incoming*/); status != OK) return status;
    }

+15 −0
Original line number Diff line number Diff line
@@ -50,6 +50,8 @@ constexpr uint32_t RPC_WIRE_PROTOCOL_VERSION = RPC_WIRE_PROTOCOL_VERSION_EXPERIM
 */
class RpcSession final : public virtual RefBase {
public:
    static constexpr size_t kDefaultMaxOutgoingThreads = 10;

    // Create an RpcSession with default configuration (raw sockets).
    static sp<RpcSession> make();

@@ -71,6 +73,18 @@ public:
    void setMaxIncomingThreads(size_t threads);
    size_t getMaxIncomingThreads();

    /**
     * Set the maximum number of outgoing threads allowed to be made.
     * By default, this is |kDefaultMaxOutgoingThreads|. This must be called before setting up this
     * connection as a client.
     *
     * This limits the number of outgoing threads on top of the remote peer setting. This RpcSession
     * will only instantiate |min(maxOutgoingThreads, remoteMaxThreads)| outgoing threads, where
     * |remoteMaxThreads| can be retrieved from the remote peer via |getRemoteMaxThreads()|.
     */
    void setMaxOutgoingThreads(size_t threads);
    size_t getMaxOutgoingThreads();

    /**
     * By default, the minimum of the supported versions of the client and the
     * server will be used. Usually, this API should only be used for debugging.
@@ -308,6 +322,7 @@ private:
    std::mutex mMutex; // for all below

    size_t mMaxIncomingThreads = 0;
    size_t mMaxOutgoingThreads = kDefaultMaxOutgoingThreads;
    std::optional<uint32_t> mProtocolVersion;

    std::condition_variable mAvailableConnectionCv; // for mWaitingThreads
+27 −11
Original line number Diff line number Diff line
@@ -481,6 +481,7 @@ public:
        size_t numThreads = 1;
        size_t numSessions = 1;
        size_t numIncomingConnections = 0;
        size_t numOutgoingConnections = SIZE_MAX;
    };

    static inline std::string PrintParamInfo(const testing::TestParamInfo<ParamType>& info) {
@@ -614,6 +615,7 @@ public:

        for (const auto& session : sessions) {
            session->setMaxIncomingThreads(options.numIncomingConnections);
            session->setMaxOutgoingThreads(options.numOutgoingConnections);

            switch (socketType) {
                case SocketType::PRECONNECTED:
@@ -655,6 +657,9 @@ public:

        return ret;
    }

    void testThreadPoolOverSaturated(sp<IBinderRpcTest> iface, size_t numCalls,
                                     size_t sleepMs = 500);
};

TEST_P(BinderRpc, Ping) {
@@ -996,28 +1001,39 @@ TEST_P(BinderRpc, ThreadPoolGreaterThanEqualRequested) {
    for (auto& t : ts) t.join();
}

TEST_P(BinderRpc, ThreadPoolOverSaturated) {
    constexpr size_t kNumThreads = 10;
    constexpr size_t kNumCalls = kNumThreads + 3;
    constexpr size_t kSleepMs = 500;

    auto proc = createRpcTestSocketServerProcess({.numThreads = kNumThreads});

void BinderRpc::testThreadPoolOverSaturated(sp<IBinderRpcTest> iface, size_t numCalls,
                                            size_t sleepMs) {
    size_t epochMsBefore = epochMillis();

    std::vector<std::thread> ts;
    for (size_t i = 0; i < kNumCalls; i++) {
        ts.push_back(std::thread([&] { proc.rootIface->sleepMs(kSleepMs); }));
    for (size_t i = 0; i < numCalls; i++) {
        ts.push_back(std::thread([&] { iface->sleepMs(sleepMs); }));
    }

    for (auto& t : ts) t.join();

    size_t epochMsAfter = epochMillis();

    EXPECT_GE(epochMsAfter, epochMsBefore + 2 * kSleepMs);
    EXPECT_GE(epochMsAfter, epochMsBefore + 2 * sleepMs);

    // Potential flake, but make sure calls are handled in parallel.
    EXPECT_LE(epochMsAfter, epochMsBefore + 3 * kSleepMs);
    EXPECT_LE(epochMsAfter, epochMsBefore + 3 * sleepMs);
}

TEST_P(BinderRpc, ThreadPoolOverSaturated) {
    constexpr size_t kNumThreads = 10;
    constexpr size_t kNumCalls = kNumThreads + 3;
    auto proc = createRpcTestSocketServerProcess({.numThreads = kNumThreads});
    testThreadPoolOverSaturated(proc.rootIface, kNumCalls);
}

TEST_P(BinderRpc, ThreadPoolLimitOutgoing) {
    constexpr size_t kNumThreads = 20;
    constexpr size_t kNumOutgoingConnections = 10;
    constexpr size_t kNumCalls = kNumOutgoingConnections + 3;
    auto proc = createRpcTestSocketServerProcess(
            {.numThreads = kNumThreads, .numOutgoingConnections = kNumOutgoingConnections});
    testThreadPoolOverSaturated(proc.rootIface, kNumCalls);
}

TEST_P(BinderRpc, ThreadingStressTest) {