Donate to e Foundation | Murena handsets with /e/OS | Own a part of Murena! Learn more

Commit 38332d52 authored by Alec Mouri's avatar Alec Mouri Committed by Android (Google) Code Review
Browse files

Merge "Use semaphore instead of condition variable" into tm-dev

parents 8ffb0288 8d7d0f42
Loading
Loading
Loading
Loading
+33 −23
Original line number Diff line number Diff line
@@ -19,6 +19,8 @@
#define LOG_TAG "BackgroundExecutor"
#define ATRACE_TAG ATRACE_TAG_GRAPHICS

#include <utils/Log.h>

#include "BackgroundExecutor.h"

namespace android {
@@ -27,41 +29,49 @@ ANDROID_SINGLETON_STATIC_INSTANCE(BackgroundExecutor);

BackgroundExecutor::BackgroundExecutor() : Singleton<BackgroundExecutor>() {
    mThread = std::thread([&]() {
        bool done = false;
        while (!done) {
            std::vector<std::function<void()>> tasks;
            {
                std::unique_lock lock(mMutex);
                android::base::ScopedLockAssertion assumeLock(mMutex);
                mWorkAvailableCv.wait(lock,
                                      [&]() REQUIRES(mMutex) { return mDone || !mTasks.empty(); });
                tasks = std::move(mTasks);
                mTasks.clear();
                done = mDone;
            } // unlock mMutex
        LOG_ALWAYS_FATAL_IF(sem_init(&mSemaphore, 0, 0), "sem_init failed");
        while (!mDone) {
            LOG_ALWAYS_FATAL_IF(sem_wait(&mSemaphore), "sem_wait failed (%d)", errno);

            ftl::SmallVector<Work*, 10> workItems;

            Work* work = mWorks.pop();
            while (work) {
                workItems.push_back(work);
                work = mWorks.pop();
            }

            for (auto& task : tasks) {
            // Sequence numbers are guaranteed to be in intended order, as we assume a single
            // producer and single consumer.
            std::stable_sort(workItems.begin(), workItems.end(), [](Work* left, Work* right) {
                return left->sequence < right->sequence;
            });
            for (Work* work : workItems) {
                for (auto& task : work->tasks) {
                    task();
                }
                delete work;
            }
        }
    });
}

BackgroundExecutor::~BackgroundExecutor() {
    {
        std::scoped_lock lock(mMutex);
    mDone = true;
        mWorkAvailableCv.notify_all();
    }
    LOG_ALWAYS_FATAL_IF(sem_post(&mSemaphore), "sem_post failed");
    if (mThread.joinable()) {
        mThread.join();
        LOG_ALWAYS_FATAL_IF(sem_destroy(&mSemaphore), "sem_destroy failed");
    }
}

void BackgroundExecutor::execute(std::function<void()> task) {
    std::scoped_lock lock(mMutex);
    mTasks.emplace_back(std::move(task));
    mWorkAvailableCv.notify_all();
void BackgroundExecutor::sendCallbacks(Callbacks&& tasks) {
    Work* work = new Work();
    work->sequence = mSequence;
    work->tasks = std::move(tasks);
    mWorks.push(work);
    mSequence++;
    LOG_ALWAYS_FATAL_IF(sem_post(&mSemaphore), "sem_post failed");
}

} // namespace android
 No newline at end of file
+21 −6
Original line number Diff line number Diff line
@@ -16,9 +16,11 @@

#pragma once

#include <Tracing/LocklessStack.h>
#include <android-base/thread_annotations.h>
#include <ftl/small_vector.h>
#include <semaphore.h>
#include <utils/Singleton.h>
#include <condition_variable>
#include <mutex>
#include <queue>
#include <thread>
@@ -30,13 +32,26 @@ class BackgroundExecutor : public Singleton<BackgroundExecutor> {
public:
    BackgroundExecutor();
    ~BackgroundExecutor();
    void execute(std::function<void()>);
    using Callbacks = ftl::SmallVector<std::function<void()>, 10>;
    // Queues callbacks onto a work queue to be executed by a background thread.
    // Note that this is not thread-safe - a single producer is assumed.
    void sendCallbacks(Callbacks&& tasks);

private:
    std::mutex mMutex;
    std::condition_variable mWorkAvailableCv GUARDED_BY(mMutex);
    bool mDone GUARDED_BY(mMutex) = false;
    std::vector<std::function<void()>> mTasks GUARDED_BY(mMutex);
    sem_t mSemaphore;
    std::atomic_bool mDone = false;

    // Sequence number for work items.
    // Work items are batched by sequence number. Work items for earlier sequence numbers are
    // executed first. Work items with the same sequence number are executed in the same order they
    // were added to the stack (meaning the stack must reverse the order after popping from the
    // queue)
    int32_t mSequence = 0;
    struct Work {
        int32_t sequence = 0;
        Callbacks tasks;
    };
    LocklessStack<Work> mWorks;
    std::thread mThread;
};

+7 −7
Original line number Diff line number Diff line
@@ -3226,7 +3226,7 @@ void SurfaceFlinger::updateInputFlinger() {
    if (!updateWindowInfo && mInputWindowCommands.empty()) {
        return;
    }
    BackgroundExecutor::getInstance().execute([updateWindowInfo,
    BackgroundExecutor::getInstance().sendCallbacks({[updateWindowInfo,
                                                      windowInfos = std::move(windowInfos),
                                                      displayInfos = std::move(displayInfos),
                                                      inputWindowCommands =
@@ -3244,7 +3244,7 @@ void SurfaceFlinger::updateInputFlinger() {
        for (const auto& focusRequest : inputWindowCommands.focusRequests) {
            inputFlinger->setFocusedWindow(focusRequest);
        }
    });
    }});

    mInputWindowCommands.clear();
}
+4 −1
Original line number Diff line number Diff line
@@ -185,6 +185,7 @@ void TransactionCallbackInvoker::addPresentFence(const sp<Fence>& presentFence)
void TransactionCallbackInvoker::sendCallbacks(bool onCommitOnly) {
    // For each listener
    auto completedTransactionsItr = mCompletedTransactions.begin();
    BackgroundExecutor::Callbacks callbacks;
    while (completedTransactionsItr != mCompletedTransactions.end()) {
        auto& [listener, transactionStatsDeque] = *completedTransactionsItr;
        ListenerStats listenerStats;
@@ -219,7 +220,7 @@ void TransactionCallbackInvoker::sendCallbacks(bool onCommitOnly) {
                // keep it as an IBinder due to consistency reasons: if we
                // interface_cast at the IPC boundary when reading a Parcel,
                // we get pointers that compare unequal in the SF process.
                BackgroundExecutor::getInstance().execute([stats = std::move(listenerStats)]() {
                callbacks.emplace_back([stats = std::move(listenerStats)]() {
                    interface_cast<ITransactionCompletedListener>(stats.listener)
                            ->onTransactionCompleted(stats);
                });
@@ -231,6 +232,8 @@ void TransactionCallbackInvoker::sendCallbacks(bool onCommitOnly) {
    if (mPresentFence) {
        mPresentFence.clear();
    }

    BackgroundExecutor::getInstance().sendCallbacks(std::move(callbacks));
}

// -----------------------------------------------------------------------