Donate to e Foundation | Murena handsets with /e/OS | Own a part of Murena! Learn more

Commit 25ff3459 authored by Treehugger Robot's avatar Treehugger Robot Committed by Automerger Merge Worker
Browse files

Merge "Update BackgroundExecutor to use LocklessQueue" into udc-dev am: 052dcb5c

parents 1b66f5be 052dcb5c
Loading
Loading
Loading
Loading
+11 −25
Original line number Diff line number Diff line
@@ -28,29 +28,19 @@ namespace android {
ANDROID_SINGLETON_STATIC_INSTANCE(BackgroundExecutor);

BackgroundExecutor::BackgroundExecutor() : Singleton<BackgroundExecutor>() {
    mThread = std::thread([&]() {
    // mSemaphore must be initialized before any calls to
    // BackgroundExecutor::sendCallbacks. For this reason, we initialize it
    // within the constructor instead of within mThread.
    LOG_ALWAYS_FATAL_IF(sem_init(&mSemaphore, 0, 0), "sem_init failed");
    mThread = std::thread([&]() {
        while (!mDone) {
            LOG_ALWAYS_FATAL_IF(sem_wait(&mSemaphore), "sem_wait failed (%d)", errno);

            ftl::SmallVector<Work*, 10> workItems;

            Work* work = mWorks.pop();
            while (work) {
                workItems.push_back(work);
                work = mWorks.pop();
            }

            // Sequence numbers are guaranteed to be in intended order, as we assume a single
            // producer and single consumer.
            std::stable_sort(workItems.begin(), workItems.end(), [](Work* left, Work* right) {
                return left->sequence < right->sequence;
            });
            for (Work* work : workItems) {
                for (auto& task : work->tasks) {
                    task();
            auto callbacks = mCallbacksQueue.pop();
            if (!callbacks) {
                continue;
            }
                delete work;
            for (auto& callback : *callbacks) {
                callback();
            }
        }
    });
@@ -66,11 +56,7 @@ BackgroundExecutor::~BackgroundExecutor() {
}

void BackgroundExecutor::sendCallbacks(Callbacks&& tasks) {
    Work* work = new Work();
    work->sequence = mSequence;
    work->tasks = std::move(tasks);
    mWorks.push(work);
    mSequence++;
    mCallbacksQueue.push(std::move(tasks));
    LOG_ALWAYS_FATAL_IF(sem_post(&mSemaphore), "sem_post failed");
}

+4 −16
Original line number Diff line number Diff line
@@ -16,15 +16,13 @@

#pragma once

#include <Tracing/LocklessStack.h>
#include <android-base/thread_annotations.h>
#include <ftl/small_vector.h>
#include <semaphore.h>
#include <utils/Singleton.h>
#include <mutex>
#include <queue>
#include <thread>

#include "LocklessQueue.h"

namespace android {

// Executes tasks off the main thread.
@@ -34,24 +32,14 @@ public:
    ~BackgroundExecutor();
    using Callbacks = ftl::SmallVector<std::function<void()>, 10>;
    // Queues callbacks onto a work queue to be executed by a background thread.
    // Note that this is not thread-safe - a single producer is assumed.
    // This is safe to call from multiple threads.
    void sendCallbacks(Callbacks&& tasks);

private:
    sem_t mSemaphore;
    std::atomic_bool mDone = false;

    // Sequence number for work items.
    // Work items are batched by sequence number. Work items for earlier sequence numbers are
    // executed first. Work items with the same sequence number are executed in the same order they
    // were added to the stack (meaning the stack must reverse the order after popping from the
    // queue)
    int32_t mSequence = 0;
    struct Work {
        int32_t sequence = 0;
        Callbacks tasks;
    };
    LocklessStack<Work> mWorks;
    LocklessQueue<Callbacks> mCallbacksQueue;
    std::thread mThread;
};

+1 −0
Original line number Diff line number Diff line
@@ -71,6 +71,7 @@ cc_test {
        ":libsurfaceflinger_sources",
        "libsurfaceflinger_unittest_main.cpp",
        "ActiveDisplayRotationFlagsTest.cpp",
        "BackgroundExecutorTest.cpp",
        "CompositionTest.cpp",
        "DisplayIdGeneratorTest.cpp",
        "DisplayTransactionTest.cpp",
+57 −0
Original line number Diff line number Diff line
#include <gtest/gtest.h>
#include <condition_variable>

#include "BackgroundExecutor.h"

namespace android {

class BackgroundExecutorTest : public testing::Test {};

namespace {

TEST_F(BackgroundExecutorTest, singleProducer) {
    std::mutex mutex;
    std::condition_variable condition_variable;
    bool backgroundTaskComplete = false;

    BackgroundExecutor::getInstance().sendCallbacks(
            {[&mutex, &condition_variable, &backgroundTaskComplete]() {
                std::lock_guard<std::mutex> lock{mutex};
                condition_variable.notify_one();
                backgroundTaskComplete = true;
            }});

    std::unique_lock<std::mutex> lock{mutex};
    condition_variable.wait(lock, [&backgroundTaskComplete]() { return backgroundTaskComplete; });
    ASSERT_TRUE(backgroundTaskComplete);
}

TEST_F(BackgroundExecutorTest, multipleProducers) {
    std::mutex mutex;
    std::condition_variable condition_variable;
    const int backgroundTaskCount = 10;
    int backgroundTaskCompleteCount = 0;

    for (int i = 0; i < backgroundTaskCount; i++) {
        std::thread([&mutex, &condition_variable, &backgroundTaskCompleteCount]() {
            BackgroundExecutor::getInstance().sendCallbacks(
                    {[&mutex, &condition_variable, &backgroundTaskCompleteCount]() {
                        std::lock_guard<std::mutex> lock{mutex};
                        backgroundTaskCompleteCount++;
                        if (backgroundTaskCompleteCount == backgroundTaskCount) {
                            condition_variable.notify_one();
                        }
                    }});
        }).detach();
    }

    std::unique_lock<std::mutex> lock{mutex};
    condition_variable.wait(lock, [&backgroundTaskCompleteCount]() {
        return backgroundTaskCompleteCount == backgroundTaskCount;
    });
    ASSERT_EQ(backgroundTaskCount, backgroundTaskCompleteCount);
}

} // namespace

} // namespace android