Loading services/surfaceflinger/BackgroundExecutor.cpp +11 −25 Original line number Diff line number Diff line Loading @@ -28,29 +28,19 @@ namespace android { ANDROID_SINGLETON_STATIC_INSTANCE(BackgroundExecutor); BackgroundExecutor::BackgroundExecutor() : Singleton<BackgroundExecutor>() { mThread = std::thread([&]() { // mSemaphore must be initialized before any calls to // BackgroundExecutor::sendCallbacks. For this reason, we initialize it // within the constructor instead of within mThread. LOG_ALWAYS_FATAL_IF(sem_init(&mSemaphore, 0, 0), "sem_init failed"); mThread = std::thread([&]() { while (!mDone) { LOG_ALWAYS_FATAL_IF(sem_wait(&mSemaphore), "sem_wait failed (%d)", errno); ftl::SmallVector<Work*, 10> workItems; Work* work = mWorks.pop(); while (work) { workItems.push_back(work); work = mWorks.pop(); } // Sequence numbers are guaranteed to be in intended order, as we assume a single // producer and single consumer. std::stable_sort(workItems.begin(), workItems.end(), [](Work* left, Work* right) { return left->sequence < right->sequence; }); for (Work* work : workItems) { for (auto& task : work->tasks) { task(); auto callbacks = mCallbacksQueue.pop(); if (!callbacks) { continue; } delete work; for (auto& callback : *callbacks) { callback(); } } }); Loading @@ -66,11 +56,7 @@ BackgroundExecutor::~BackgroundExecutor() { } void BackgroundExecutor::sendCallbacks(Callbacks&& tasks) { Work* work = new Work(); work->sequence = mSequence; work->tasks = std::move(tasks); mWorks.push(work); mSequence++; mCallbacksQueue.push(std::move(tasks)); LOG_ALWAYS_FATAL_IF(sem_post(&mSemaphore), "sem_post failed"); } Loading services/surfaceflinger/BackgroundExecutor.h +4 −16 Original line number Diff line number Diff line Loading @@ -16,15 +16,13 @@ #pragma once #include <Tracing/LocklessStack.h> #include <android-base/thread_annotations.h> #include <ftl/small_vector.h> #include <semaphore.h> #include <utils/Singleton.h> #include <mutex> #include <queue> #include <thread> #include "LocklessQueue.h" namespace android { // Executes tasks off the main thread. Loading @@ -34,24 +32,14 @@ public: ~BackgroundExecutor(); using Callbacks = ftl::SmallVector<std::function<void()>, 10>; // Queues callbacks onto a work queue to be executed by a background thread. // Note that this is not thread-safe - a single producer is assumed. // This is safe to call from multiple threads. void sendCallbacks(Callbacks&& tasks); private: sem_t mSemaphore; std::atomic_bool mDone = false; // Sequence number for work items. // Work items are batched by sequence number. Work items for earlier sequence numbers are // executed first. Work items with the same sequence number are executed in the same order they // were added to the stack (meaning the stack must reverse the order after popping from the // queue) int32_t mSequence = 0; struct Work { int32_t sequence = 0; Callbacks tasks; }; LocklessStack<Work> mWorks; LocklessQueue<Callbacks> mCallbacksQueue; std::thread mThread; }; Loading services/surfaceflinger/tests/unittests/Android.bp +1 −0 Original line number Diff line number Diff line Loading @@ -71,6 +71,7 @@ cc_test { ":libsurfaceflinger_sources", "libsurfaceflinger_unittest_main.cpp", "ActiveDisplayRotationFlagsTest.cpp", "BackgroundExecutorTest.cpp", "CompositionTest.cpp", "DisplayIdGeneratorTest.cpp", "DisplayTransactionTest.cpp", Loading services/surfaceflinger/tests/unittests/BackgroundExecutorTest.cpp 0 → 100644 +57 −0 Original line number Diff line number Diff line #include <gtest/gtest.h> #include <condition_variable> #include "BackgroundExecutor.h" namespace android { class BackgroundExecutorTest : public testing::Test {}; namespace { TEST_F(BackgroundExecutorTest, singleProducer) { std::mutex mutex; std::condition_variable condition_variable; bool backgroundTaskComplete = false; BackgroundExecutor::getInstance().sendCallbacks( {[&mutex, &condition_variable, &backgroundTaskComplete]() { std::lock_guard<std::mutex> lock{mutex}; condition_variable.notify_one(); backgroundTaskComplete = true; }}); std::unique_lock<std::mutex> lock{mutex}; condition_variable.wait(lock, [&backgroundTaskComplete]() { return backgroundTaskComplete; }); ASSERT_TRUE(backgroundTaskComplete); } TEST_F(BackgroundExecutorTest, multipleProducers) { std::mutex mutex; std::condition_variable condition_variable; const int backgroundTaskCount = 10; int backgroundTaskCompleteCount = 0; for (int i = 0; i < backgroundTaskCount; i++) { std::thread([&mutex, &condition_variable, &backgroundTaskCompleteCount]() { BackgroundExecutor::getInstance().sendCallbacks( {[&mutex, &condition_variable, &backgroundTaskCompleteCount]() { std::lock_guard<std::mutex> lock{mutex}; backgroundTaskCompleteCount++; if (backgroundTaskCompleteCount == backgroundTaskCount) { condition_variable.notify_one(); } }}); }).detach(); } std::unique_lock<std::mutex> lock{mutex}; condition_variable.wait(lock, [&backgroundTaskCompleteCount]() { return backgroundTaskCompleteCount == backgroundTaskCount; }); ASSERT_EQ(backgroundTaskCount, backgroundTaskCompleteCount); } } // namespace } // namespace android Loading
services/surfaceflinger/BackgroundExecutor.cpp +11 −25 Original line number Diff line number Diff line Loading @@ -28,29 +28,19 @@ namespace android { ANDROID_SINGLETON_STATIC_INSTANCE(BackgroundExecutor); BackgroundExecutor::BackgroundExecutor() : Singleton<BackgroundExecutor>() { mThread = std::thread([&]() { // mSemaphore must be initialized before any calls to // BackgroundExecutor::sendCallbacks. For this reason, we initialize it // within the constructor instead of within mThread. LOG_ALWAYS_FATAL_IF(sem_init(&mSemaphore, 0, 0), "sem_init failed"); mThread = std::thread([&]() { while (!mDone) { LOG_ALWAYS_FATAL_IF(sem_wait(&mSemaphore), "sem_wait failed (%d)", errno); ftl::SmallVector<Work*, 10> workItems; Work* work = mWorks.pop(); while (work) { workItems.push_back(work); work = mWorks.pop(); } // Sequence numbers are guaranteed to be in intended order, as we assume a single // producer and single consumer. std::stable_sort(workItems.begin(), workItems.end(), [](Work* left, Work* right) { return left->sequence < right->sequence; }); for (Work* work : workItems) { for (auto& task : work->tasks) { task(); auto callbacks = mCallbacksQueue.pop(); if (!callbacks) { continue; } delete work; for (auto& callback : *callbacks) { callback(); } } }); Loading @@ -66,11 +56,7 @@ BackgroundExecutor::~BackgroundExecutor() { } void BackgroundExecutor::sendCallbacks(Callbacks&& tasks) { Work* work = new Work(); work->sequence = mSequence; work->tasks = std::move(tasks); mWorks.push(work); mSequence++; mCallbacksQueue.push(std::move(tasks)); LOG_ALWAYS_FATAL_IF(sem_post(&mSemaphore), "sem_post failed"); } Loading
services/surfaceflinger/BackgroundExecutor.h +4 −16 Original line number Diff line number Diff line Loading @@ -16,15 +16,13 @@ #pragma once #include <Tracing/LocklessStack.h> #include <android-base/thread_annotations.h> #include <ftl/small_vector.h> #include <semaphore.h> #include <utils/Singleton.h> #include <mutex> #include <queue> #include <thread> #include "LocklessQueue.h" namespace android { // Executes tasks off the main thread. Loading @@ -34,24 +32,14 @@ public: ~BackgroundExecutor(); using Callbacks = ftl::SmallVector<std::function<void()>, 10>; // Queues callbacks onto a work queue to be executed by a background thread. // Note that this is not thread-safe - a single producer is assumed. // This is safe to call from multiple threads. void sendCallbacks(Callbacks&& tasks); private: sem_t mSemaphore; std::atomic_bool mDone = false; // Sequence number for work items. // Work items are batched by sequence number. Work items for earlier sequence numbers are // executed first. Work items with the same sequence number are executed in the same order they // were added to the stack (meaning the stack must reverse the order after popping from the // queue) int32_t mSequence = 0; struct Work { int32_t sequence = 0; Callbacks tasks; }; LocklessStack<Work> mWorks; LocklessQueue<Callbacks> mCallbacksQueue; std::thread mThread; }; Loading
services/surfaceflinger/tests/unittests/Android.bp +1 −0 Original line number Diff line number Diff line Loading @@ -71,6 +71,7 @@ cc_test { ":libsurfaceflinger_sources", "libsurfaceflinger_unittest_main.cpp", "ActiveDisplayRotationFlagsTest.cpp", "BackgroundExecutorTest.cpp", "CompositionTest.cpp", "DisplayIdGeneratorTest.cpp", "DisplayTransactionTest.cpp", Loading
services/surfaceflinger/tests/unittests/BackgroundExecutorTest.cpp 0 → 100644 +57 −0 Original line number Diff line number Diff line #include <gtest/gtest.h> #include <condition_variable> #include "BackgroundExecutor.h" namespace android { class BackgroundExecutorTest : public testing::Test {}; namespace { TEST_F(BackgroundExecutorTest, singleProducer) { std::mutex mutex; std::condition_variable condition_variable; bool backgroundTaskComplete = false; BackgroundExecutor::getInstance().sendCallbacks( {[&mutex, &condition_variable, &backgroundTaskComplete]() { std::lock_guard<std::mutex> lock{mutex}; condition_variable.notify_one(); backgroundTaskComplete = true; }}); std::unique_lock<std::mutex> lock{mutex}; condition_variable.wait(lock, [&backgroundTaskComplete]() { return backgroundTaskComplete; }); ASSERT_TRUE(backgroundTaskComplete); } TEST_F(BackgroundExecutorTest, multipleProducers) { std::mutex mutex; std::condition_variable condition_variable; const int backgroundTaskCount = 10; int backgroundTaskCompleteCount = 0; for (int i = 0; i < backgroundTaskCount; i++) { std::thread([&mutex, &condition_variable, &backgroundTaskCompleteCount]() { BackgroundExecutor::getInstance().sendCallbacks( {[&mutex, &condition_variable, &backgroundTaskCompleteCount]() { std::lock_guard<std::mutex> lock{mutex}; backgroundTaskCompleteCount++; if (backgroundTaskCompleteCount == backgroundTaskCount) { condition_variable.notify_one(); } }}); }).detach(); } std::unique_lock<std::mutex> lock{mutex}; condition_variable.wait(lock, [&backgroundTaskCompleteCount]() { return backgroundTaskCompleteCount == backgroundTaskCount; }); ASSERT_EQ(backgroundTaskCount, backgroundTaskCompleteCount); } } // namespace } // namespace android