Donate to e Foundation | Murena handsets with /e/OS | Own a part of Murena! Learn more

Commit c556e469 authored by Alec Mouri's avatar Alec Mouri Committed by Automerger Merge Worker
Browse files

Merge "Use semaphore instead of condition variable" into tm-dev am: 38332d52

parents 60a3a448 38332d52
Loading
Loading
Loading
Loading
+33 −23
Original line number Diff line number Diff line
@@ -19,6 +19,8 @@
#define LOG_TAG "BackgroundExecutor"
#define ATRACE_TAG ATRACE_TAG_GRAPHICS

#include <utils/Log.h>

#include "BackgroundExecutor.h"

namespace android {
@@ -27,41 +29,49 @@ ANDROID_SINGLETON_STATIC_INSTANCE(BackgroundExecutor);

BackgroundExecutor::BackgroundExecutor() : Singleton<BackgroundExecutor>() {
    mThread = std::thread([&]() {
        bool done = false;
        while (!done) {
            std::vector<std::function<void()>> tasks;
            {
                std::unique_lock lock(mMutex);
                android::base::ScopedLockAssertion assumeLock(mMutex);
                mWorkAvailableCv.wait(lock,
                                      [&]() REQUIRES(mMutex) { return mDone || !mTasks.empty(); });
                tasks = std::move(mTasks);
                mTasks.clear();
                done = mDone;
            } // unlock mMutex
        LOG_ALWAYS_FATAL_IF(sem_init(&mSemaphore, 0, 0), "sem_init failed");
        while (!mDone) {
            LOG_ALWAYS_FATAL_IF(sem_wait(&mSemaphore), "sem_wait failed (%d)", errno);

            ftl::SmallVector<Work*, 10> workItems;

            Work* work = mWorks.pop();
            while (work) {
                workItems.push_back(work);
                work = mWorks.pop();
            }

            for (auto& task : tasks) {
            // Sequence numbers are guaranteed to be in intended order, as we assume a single
            // producer and single consumer.
            std::stable_sort(workItems.begin(), workItems.end(), [](Work* left, Work* right) {
                return left->sequence < right->sequence;
            });
            for (Work* work : workItems) {
                for (auto& task : work->tasks) {
                    task();
                }
                delete work;
            }
        }
    });
}

BackgroundExecutor::~BackgroundExecutor() {
    {
        std::scoped_lock lock(mMutex);
    mDone = true;
        mWorkAvailableCv.notify_all();
    }
    LOG_ALWAYS_FATAL_IF(sem_post(&mSemaphore), "sem_post failed");
    if (mThread.joinable()) {
        mThread.join();
        LOG_ALWAYS_FATAL_IF(sem_destroy(&mSemaphore), "sem_destroy failed");
    }
}

void BackgroundExecutor::execute(std::function<void()> task) {
    std::scoped_lock lock(mMutex);
    mTasks.emplace_back(std::move(task));
    mWorkAvailableCv.notify_all();
void BackgroundExecutor::sendCallbacks(Callbacks&& tasks) {
    Work* work = new Work();
    work->sequence = mSequence;
    work->tasks = std::move(tasks);
    mWorks.push(work);
    mSequence++;
    LOG_ALWAYS_FATAL_IF(sem_post(&mSemaphore), "sem_post failed");
}

} // namespace android
 No newline at end of file
+21 −6
Original line number Diff line number Diff line
@@ -16,9 +16,11 @@

#pragma once

#include <Tracing/LocklessStack.h>
#include <android-base/thread_annotations.h>
#include <ftl/small_vector.h>
#include <semaphore.h>
#include <utils/Singleton.h>
#include <condition_variable>
#include <mutex>
#include <queue>
#include <thread>
@@ -30,13 +32,26 @@ class BackgroundExecutor : public Singleton<BackgroundExecutor> {
public:
    BackgroundExecutor();
    ~BackgroundExecutor();
    void execute(std::function<void()>);
    using Callbacks = ftl::SmallVector<std::function<void()>, 10>;
    // Queues callbacks onto a work queue to be executed by a background thread.
    // Note that this is not thread-safe - a single producer is assumed.
    void sendCallbacks(Callbacks&& tasks);

private:
    std::mutex mMutex;
    std::condition_variable mWorkAvailableCv GUARDED_BY(mMutex);
    bool mDone GUARDED_BY(mMutex) = false;
    std::vector<std::function<void()>> mTasks GUARDED_BY(mMutex);
    sem_t mSemaphore;
    std::atomic_bool mDone = false;

    // Sequence number for work items.
    // Work items are batched by sequence number. Work items for earlier sequence numbers are
    // executed first. Work items with the same sequence number are executed in the same order they
    // were added to the stack (meaning the stack must reverse the order after popping from the
    // queue)
    int32_t mSequence = 0;
    struct Work {
        int32_t sequence = 0;
        Callbacks tasks;
    };
    LocklessStack<Work> mWorks;
    std::thread mThread;
};

+7 −7
Original line number Diff line number Diff line
@@ -3226,7 +3226,7 @@ void SurfaceFlinger::updateInputFlinger() {
    if (!updateWindowInfo && mInputWindowCommands.empty()) {
        return;
    }
    BackgroundExecutor::getInstance().execute([updateWindowInfo,
    BackgroundExecutor::getInstance().sendCallbacks({[updateWindowInfo,
                                                      windowInfos = std::move(windowInfos),
                                                      displayInfos = std::move(displayInfos),
                                                      inputWindowCommands =
@@ -3244,7 +3244,7 @@ void SurfaceFlinger::updateInputFlinger() {
        for (const auto& focusRequest : inputWindowCommands.focusRequests) {
            inputFlinger->setFocusedWindow(focusRequest);
        }
    });
    }});

    mInputWindowCommands.clear();
}
+4 −1
Original line number Diff line number Diff line
@@ -185,6 +185,7 @@ void TransactionCallbackInvoker::addPresentFence(const sp<Fence>& presentFence)
void TransactionCallbackInvoker::sendCallbacks(bool onCommitOnly) {
    // For each listener
    auto completedTransactionsItr = mCompletedTransactions.begin();
    BackgroundExecutor::Callbacks callbacks;
    while (completedTransactionsItr != mCompletedTransactions.end()) {
        auto& [listener, transactionStatsDeque] = *completedTransactionsItr;
        ListenerStats listenerStats;
@@ -219,7 +220,7 @@ void TransactionCallbackInvoker::sendCallbacks(bool onCommitOnly) {
                // keep it as an IBinder due to consistency reasons: if we
                // interface_cast at the IPC boundary when reading a Parcel,
                // we get pointers that compare unequal in the SF process.
                BackgroundExecutor::getInstance().execute([stats = std::move(listenerStats)]() {
                callbacks.emplace_back([stats = std::move(listenerStats)]() {
                    interface_cast<ITransactionCompletedListener>(stats.listener)
                            ->onTransactionCompleted(stats);
                });
@@ -231,6 +232,8 @@ void TransactionCallbackInvoker::sendCallbacks(bool onCommitOnly) {
    if (mPresentFence) {
        mPresentFence.clear();
    }

    BackgroundExecutor::getInstance().sendCallbacks(std::move(callbacks));
}

// -----------------------------------------------------------------------