Donate to e Foundation | Murena handsets with /e/OS | Own a part of Murena! Learn more

Commit a445902a authored by Elie Kheirallah's avatar Elie Kheirallah Committed by Automerger Merge Worker
Browse files

Merge "Tracking number of threads in threadpools. Added tests for max total...

Merge "Tracking number of threads in threadpools. Added tests for max total thread count." am: 85f75bc9 am: dccd8123 am: e24c255f

Original change: https://android-review.googlesource.com/c/platform/frameworks/native/+/2072121



Change-Id: I81b427776d2920e871854e219a6dc7a07eb12e49
Signed-off-by: default avatarAutomerger Merge Worker <android-build-automerger-merge-worker@system.gserviceaccount.com>
parents 689dff4a e24c255f
Loading
Loading
Loading
Loading
+1 −1
Original line number Diff line number Diff line
@@ -539,7 +539,7 @@ status_t BBinder::setRpcClientDebug(android::base::unique_fd socketFd,
        return UNEXPECTED_NULL;
    }

    size_t binderThreadPoolMaxCount = ProcessState::self()->getThreadPoolMaxThreadCount();
    size_t binderThreadPoolMaxCount = ProcessState::self()->getThreadPoolMaxTotalThreadCount();
    if (binderThreadPoolMaxCount <= 1) {
        ALOGE("%s: ProcessState thread pool max count is %zu. RPC is disabled for this service "
              "because RPC requires the service to support multithreading.",
+13 −1
Original line number Diff line number Diff line
@@ -638,7 +638,9 @@ void IPCThreadState::processPostWriteDerefs()
void IPCThreadState::joinThreadPool(bool isMain)
{
    LOG_THREADPOOL("**** THREAD %p (PID %d) IS JOINING THE THREAD POOL\n", (void*)pthread_self(), getpid());

    pthread_mutex_lock(&mProcess->mThreadCountLock);
    mProcess->mCurrentThreads++;
    pthread_mutex_unlock(&mProcess->mThreadCountLock);
    mOut.writeInt32(isMain ? BC_ENTER_LOOPER : BC_REGISTER_LOOPER);

    mIsLooper = true;
@@ -666,6 +668,13 @@ void IPCThreadState::joinThreadPool(bool isMain)
    mOut.writeInt32(BC_EXIT_LOOPER);
    mIsLooper = false;
    talkWithDriver(false);
    pthread_mutex_lock(&mProcess->mThreadCountLock);
    LOG_ALWAYS_FATAL_IF(mProcess->mCurrentThreads == 0,
                        "Threadpool thread count = 0. Thread cannot exist and exit in empty "
                        "threadpool\n"
                        "Misconfiguration. Increase threadpool max threads configuration\n");
    mProcess->mCurrentThreads--;
    pthread_mutex_unlock(&mProcess->mThreadCountLock);
}

status_t IPCThreadState::setupPolling(int* fd)
@@ -677,6 +686,9 @@ status_t IPCThreadState::setupPolling(int* fd)
    mOut.writeInt32(BC_ENTER_LOOPER);
    flushCommands();
    *fd = mProcess->mDriverFD;
    pthread_mutex_lock(&mProcess->mThreadCountLock);
    mProcess->mCurrentThreads++;
    pthread_mutex_unlock(&mProcess->mThreadCountLock);
    return 0;
}

+14 −4
Original line number Diff line number Diff line
@@ -187,7 +187,6 @@ void ProcessState::startThreadPool()
            ALOGW("Extra binder thread started, but 0 threads requested. Do not use "
                  "*startThreadPool when zero threads are requested.");
        }

        mThreadPoolStarted = true;
        spawnPooledThread(true);
    }
@@ -391,6 +390,7 @@ void ProcessState::spawnPooledThread(bool isMain)
        ALOGV("Spawning new pooled thread, name=%s\n", name.string());
        sp<Thread> t = sp<PoolThread>::make(isMain);
        t->run(name.string());
        mKernelStartedThreads++;
    }
}

@@ -407,12 +407,20 @@ status_t ProcessState::setThreadPoolMaxThreadCount(size_t maxThreads) {
    return result;
}

size_t ProcessState::getThreadPoolMaxThreadCount() const {
size_t ProcessState::getThreadPoolMaxTotalThreadCount() const {
    // may actually be one more than this, if join is called
    if (mThreadPoolStarted) return mMaxThreads;
    if (mThreadPoolStarted) {
        return mCurrentThreads < mKernelStartedThreads
                ? mMaxThreads
                : mMaxThreads + mCurrentThreads - mKernelStartedThreads;
    }
    // must not be initialized or maybe has poll thread setup, we
    // currently don't track this in libbinder
    return 0;
    LOG_ALWAYS_FATAL_IF(mKernelStartedThreads != 0,
                        "Expecting 0 kernel started threads but have"
                        " %zu",
                        mKernelStartedThreads);
    return mCurrentThreads;
}

#define DRIVER_FEATURES_PATH "/dev/binderfs/features/"
@@ -498,6 +506,8 @@ ProcessState::ProcessState(const char* driver)
        mExecutingThreadsCount(0),
        mWaitingForThreads(0),
        mMaxThreads(DEFAULT_MAX_BINDER_THREADS),
        mCurrentThreads(0),
        mKernelStartedThreads(0),
        mStarvationStartTimeMs(0),
        mForked(false),
        mThreadPoolStarted(false),
+9 −5
Original line number Diff line number Diff line
@@ -84,11 +84,11 @@ public:
    void setCallRestriction(CallRestriction restriction);

    /**
     * Get the max number of threads that the kernel can start.
     *
     * Note: this is the lower bound. Additional threads may be started.
     * Get the max number of threads that have joined the thread pool.
     * This includes kernel started threads, user joined threads and polling
     * threads if used.
     */
    size_t getThreadPoolMaxThreadCount() const;
    size_t getThreadPoolMaxTotalThreadCount() const;

    enum class DriverFeature {
        ONEWAY_SPAM_DETECTION,
@@ -133,8 +133,12 @@ private:
    size_t mExecutingThreadsCount;
    // Number of threads calling IPCThreadState::blockUntilThreadAvailable()
    size_t mWaitingForThreads;
    // Maximum number for binder threads allowed for this process.
    // Maximum number of lazy threads to be started in the threadpool by the kernel.
    size_t mMaxThreads;
    // Current number of threads inside the thread pool.
    size_t mCurrentThreads;
    // Current number of pooled threads inside the thread pool.
    size_t mKernelStartedThreads;
    // Time when thread pool was emptied
    int64_t mStarvationStartTimeMs;

+109 −0
Original line number Diff line number Diff line
@@ -82,6 +82,7 @@ static char binderserverarg[] = "--binderserver";
static constexpr int kSchedPolicy = SCHED_RR;
static constexpr int kSchedPriority = 7;
static constexpr int kSchedPriorityMore = 8;
static constexpr int kKernelThreads = 15;

static String16 binderLibTestServiceName = String16("test.binderLib");

@@ -115,6 +116,12 @@ enum BinderLibTestTranscationCode {
    BINDER_LIB_TEST_ECHO_VECTOR,
    BINDER_LIB_TEST_REJECT_OBJECTS,
    BINDER_LIB_TEST_CAN_GET_SID,
    BINDER_LIB_TEST_GET_MAX_THREAD_COUNT,
    BINDER_LIB_TEST_SET_MAX_THREAD_COUNT,
    BINDER_LIB_TEST_LOCK_UNLOCK,
    BINDER_LIB_TEST_PROCESS_LOCK,
    BINDER_LIB_TEST_UNLOCK_AFTER_MS,
    BINDER_LIB_TEST_PROCESS_TEMPORARY_LOCK
};

pid_t start_server_process(int arg2, bool usePoll = false)
@@ -1232,6 +1239,76 @@ TEST(ServiceNotifications, Unregister) {
    EXPECT_EQ(sm->unregisterForNotifications(String16("RogerRafa"), cb), OK);
}

TEST_F(BinderLibTest, ThreadPoolAvailableThreads) {
    Parcel data, reply;
    sp<IBinder> server = addServer();
    ASSERT_TRUE(server != nullptr);
    EXPECT_THAT(server->transact(BINDER_LIB_TEST_GET_MAX_THREAD_COUNT, data, &reply),
                StatusEq(NO_ERROR));
    int32_t replyi = reply.readInt32();
    // Expect 16 threads: kKernelThreads = 15 + Pool thread == 16
    EXPECT_TRUE(replyi == kKernelThreads || replyi == kKernelThreads + 1);
    EXPECT_THAT(server->transact(BINDER_LIB_TEST_PROCESS_LOCK, data, &reply), NO_ERROR);

    /*
     * This will use all threads in the pool expect the main pool thread.
     * The service should run fine without locking, and the thread count should
     * not exceed 16 (15 Max + pool thread).
     */
    std::vector<std::thread> ts;
    for (size_t i = 0; i < kKernelThreads - 1; i++) {
        ts.push_back(std::thread([&] {
            EXPECT_THAT(server->transact(BINDER_LIB_TEST_LOCK_UNLOCK, data, &reply), NO_ERROR);
        }));
    }

    data.writeInt32(1);
    // Give a chance for all threads to be used
    EXPECT_THAT(server->transact(BINDER_LIB_TEST_UNLOCK_AFTER_MS, data, &reply), NO_ERROR);

    for (auto &t : ts) {
        t.join();
    }

    EXPECT_THAT(server->transact(BINDER_LIB_TEST_GET_MAX_THREAD_COUNT, data, &reply),
                StatusEq(NO_ERROR));
    replyi = reply.readInt32();
    // No more than 16 threads should exist.
    EXPECT_EQ(replyi, kKernelThreads + 1);
}

size_t epochMillis() {
    using std::chrono::duration_cast;
    using std::chrono::milliseconds;
    using std::chrono::seconds;
    using std::chrono::system_clock;
    return duration_cast<milliseconds>(system_clock::now().time_since_epoch()).count();
}

TEST_F(BinderLibTest, HangingServices) {
    Parcel data, reply;
    sp<IBinder> server = addServer();
    ASSERT_TRUE(server != nullptr);
    int32_t delay = 1000; // ms
    data.writeInt32(delay);
    EXPECT_THAT(server->transact(BINDER_LIB_TEST_PROCESS_TEMPORARY_LOCK, data, &reply), NO_ERROR);
    std::vector<std::thread> ts;
    size_t epochMsBefore = epochMillis();
    for (size_t i = 0; i < kKernelThreads + 1; i++) {
        ts.push_back(std::thread([&] {
            EXPECT_THAT(server->transact(BINDER_LIB_TEST_LOCK_UNLOCK, data, &reply), NO_ERROR);
        }));
    }

    for (auto &t : ts) {
        t.join();
    }
    size_t epochMsAfter = epochMillis();

    // deadlock occurred and threads only finished after 1s passed.
    EXPECT_GE(epochMsAfter, epochMsBefore + delay);
}

class BinderLibRpcTestBase : public BinderLibTest {
public:
    void SetUp() override {
@@ -1638,11 +1715,41 @@ public:
            case BINDER_LIB_TEST_CAN_GET_SID: {
                return IPCThreadState::self()->getCallingSid() == nullptr ? BAD_VALUE : NO_ERROR;
            }
            case BINDER_LIB_TEST_GET_MAX_THREAD_COUNT: {
                reply->writeInt32(ProcessState::self()->getThreadPoolMaxTotalThreadCount());
                return NO_ERROR;
            }
            case BINDER_LIB_TEST_PROCESS_LOCK: {
                blockMutex.lock();
                return NO_ERROR;
            }
            case BINDER_LIB_TEST_LOCK_UNLOCK: {
                std::lock_guard<std::mutex> _l(blockMutex);
                return NO_ERROR;
            }
            case BINDER_LIB_TEST_UNLOCK_AFTER_MS: {
                int32_t ms = data.readInt32();
                return unlockInMs(ms);
            }
            case BINDER_LIB_TEST_PROCESS_TEMPORARY_LOCK: {
                blockMutex.lock();
                std::thread t([&] {
                    unlockInMs(data.readInt32());
                }); // start local thread to unlock in 1s
                t.detach();
                return NO_ERROR;
            }
            default:
                return UNKNOWN_TRANSACTION;
        };
    }

    status_t unlockInMs(int32_t ms) {
        usleep(ms * 1000);
        blockMutex.unlock();
        return NO_ERROR;
    }

private:
    int32_t m_id;
    int32_t m_nextServerId;
@@ -1653,6 +1760,7 @@ private:
    sp<IBinder> m_strongRef;
    sp<IBinder> m_callback;
    bool m_exitOnDestroy;
    std::mutex blockMutex;
};

int run_server(int index, int readypipefd, bool usePoll)
@@ -1754,6 +1862,7 @@ int run_server(int index, int readypipefd, bool usePoll)
             }
        }
    } else {
        ProcessState::self()->setThreadPoolMaxThreadCount(kKernelThreads);
        ProcessState::self()->startThreadPool();
        IPCThreadState::self()->joinThreadPool();
    }