Loading media/utils/include/mediautils/SingleThreadExecutor.h 0 → 100644 +91 −0 Original line number Diff line number Diff line /* * Copyright (C) 2025 The Android Open Source Project * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ #pragma once #include <deque> #include <mutex> #include "Runnable.h" #include "jthread.h" namespace android::mediautils { /** * A C++ implementation similar to a Java executor, which manages a thread which runs enqueued * runnable tasks in queue order. Spawns thread on construction and joins destruction */ class SingleThreadExecutor { public: SingleThreadExecutor() : thread_([this](stop_token stok) { run(stok); }) {} ~SingleThreadExecutor() { shutdown(/* dropTasks= */ true); } void enqueue(Runnable r) { if (!r) { return; } else { std::lock_guard l{mutex_}; if (thread_.stop_requested()) return; task_list_.push_back(std::move(r)); } cv_.notify_one(); } /** * Request thread termination, optionally dropping any enqueued tasks. * Note: does not join thread in this method and no task cancellation. */ void shutdown(bool dropTasks = false) { { std::lock_guard l{mutex_}; if (thread_.stop_requested()) return; if (dropTasks) { task_list_.clear(); } thread_.request_stop(); // fancy atomic bool, so no deadlock risk } // This condition variable notification is necessary since the stop_callback functionality // of stop_token is not fully implemented cv_.notify_one(); } private: void run(stop_token stok) { std::unique_lock l{mutex_}; while (true) { cv_.wait_for(l, std::chrono::seconds(3), [this, stok]() { return !task_list_.empty() || stok.stop_requested(); }); if (!task_list_.empty()) { Runnable r {std::move(task_list_.front())}; task_list_.pop_front(); l.unlock(); r(); l.lock(); } else if (stok.stop_requested()) { break; } // else cv timeout } } std::condition_variable cv_; std::mutex mutex_; std::deque<Runnable> task_list_; jthread thread_; }; } // namespace android::mediautils media/utils/include/mediautils/jthread.h +3 −1 Original line number Diff line number Diff line Loading @@ -42,7 +42,7 @@ class stop_source { stop_token get_token() { return stop_token{*this}; } bool stop_requested() const { return cancellation_signal_.load(); } bool request_stop() { auto f = false; bool f = false; return cancellation_signal_.compare_exchange_strong(f, true); } Loading Loading @@ -84,6 +84,8 @@ class jthread { bool request_stop() { return stop_source_.request_stop(); } bool stop_requested() const { return stop_source_.stop_requested(); } private: // order matters impl::stop_source stop_source_; Loading media/utils/tests/Android.bp +1 −0 Original line number Diff line number Diff line Loading @@ -284,6 +284,7 @@ cc_test { name: "jthread_tests", defaults: ["libmediautils_tests_defaults"], srcs: [ "executor_tests.cpp", "jthread_tests.cpp", "runnable_tests.cpp", ], Loading media/utils/tests/executor_tests.cpp 0 → 100644 +78 −0 Original line number Diff line number Diff line /* * Copyright (C) 2025 The Android Open Source Project * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ #define LOG_TAG "executor_tests" #include <mediautils/SingleThreadExecutor.h> #include <mediautils/TidWrapper.h> #include <future> #include <gtest/gtest.h> using namespace android::mediautils; class ExecutorTests : public ::testing::Test { protected: void TearDown() override { executor_.shutdown(); } SingleThreadExecutor executor_; }; TEST_F(ExecutorTests, TaskEnqueue) { std::atomic<int> counter = 0; std::packaged_task<int()> task1([&]() { counter++; return 7; }); auto future1 = task1.get_future(); executor_.enqueue(Runnable{std::move(task1)}); EXPECT_EQ(future1.get(), 7); EXPECT_EQ(counter, 1); } TEST_F(ExecutorTests, TaskThread) { std::packaged_task<int()> task1([&]() { return getThreadIdWrapper(); }); auto future1 = task1.get_future(); executor_.enqueue(Runnable{std::move(task1)}); EXPECT_NE(future1.get(), getThreadIdWrapper()); } TEST_F(ExecutorTests, TaskOrder) { std::atomic<int> counter = 0; std::packaged_task<int()> task1([&]() { return counter++; }); std::packaged_task<int()> task2([&]() { return counter++; }); auto future1 = task1.get_future(); auto future2 = task2.get_future(); executor_.enqueue(Runnable{std::move(task1)}); executor_.enqueue(Runnable{std::move(task2)}); EXPECT_EQ(future1.get(), 0); EXPECT_EQ(future2.get(), 1); EXPECT_EQ(counter, 2); } TEST_F(ExecutorTests, EmptyTask) { // does not crash executor_.enqueue(Runnable{}); } TEST_F(ExecutorTests, ShutdownTwice) { executor_.shutdown(); executor_.shutdown(); } Loading
media/utils/include/mediautils/SingleThreadExecutor.h 0 → 100644 +91 −0 Original line number Diff line number Diff line /* * Copyright (C) 2025 The Android Open Source Project * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ #pragma once #include <deque> #include <mutex> #include "Runnable.h" #include "jthread.h" namespace android::mediautils { /** * A C++ implementation similar to a Java executor, which manages a thread which runs enqueued * runnable tasks in queue order. Spawns thread on construction and joins destruction */ class SingleThreadExecutor { public: SingleThreadExecutor() : thread_([this](stop_token stok) { run(stok); }) {} ~SingleThreadExecutor() { shutdown(/* dropTasks= */ true); } void enqueue(Runnable r) { if (!r) { return; } else { std::lock_guard l{mutex_}; if (thread_.stop_requested()) return; task_list_.push_back(std::move(r)); } cv_.notify_one(); } /** * Request thread termination, optionally dropping any enqueued tasks. * Note: does not join thread in this method and no task cancellation. */ void shutdown(bool dropTasks = false) { { std::lock_guard l{mutex_}; if (thread_.stop_requested()) return; if (dropTasks) { task_list_.clear(); } thread_.request_stop(); // fancy atomic bool, so no deadlock risk } // This condition variable notification is necessary since the stop_callback functionality // of stop_token is not fully implemented cv_.notify_one(); } private: void run(stop_token stok) { std::unique_lock l{mutex_}; while (true) { cv_.wait_for(l, std::chrono::seconds(3), [this, stok]() { return !task_list_.empty() || stok.stop_requested(); }); if (!task_list_.empty()) { Runnable r {std::move(task_list_.front())}; task_list_.pop_front(); l.unlock(); r(); l.lock(); } else if (stok.stop_requested()) { break; } // else cv timeout } } std::condition_variable cv_; std::mutex mutex_; std::deque<Runnable> task_list_; jthread thread_; }; } // namespace android::mediautils
media/utils/include/mediautils/jthread.h +3 −1 Original line number Diff line number Diff line Loading @@ -42,7 +42,7 @@ class stop_source { stop_token get_token() { return stop_token{*this}; } bool stop_requested() const { return cancellation_signal_.load(); } bool request_stop() { auto f = false; bool f = false; return cancellation_signal_.compare_exchange_strong(f, true); } Loading Loading @@ -84,6 +84,8 @@ class jthread { bool request_stop() { return stop_source_.request_stop(); } bool stop_requested() const { return stop_source_.stop_requested(); } private: // order matters impl::stop_source stop_source_; Loading
media/utils/tests/Android.bp +1 −0 Original line number Diff line number Diff line Loading @@ -284,6 +284,7 @@ cc_test { name: "jthread_tests", defaults: ["libmediautils_tests_defaults"], srcs: [ "executor_tests.cpp", "jthread_tests.cpp", "runnable_tests.cpp", ], Loading
media/utils/tests/executor_tests.cpp 0 → 100644 +78 −0 Original line number Diff line number Diff line /* * Copyright (C) 2025 The Android Open Source Project * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ #define LOG_TAG "executor_tests" #include <mediautils/SingleThreadExecutor.h> #include <mediautils/TidWrapper.h> #include <future> #include <gtest/gtest.h> using namespace android::mediautils; class ExecutorTests : public ::testing::Test { protected: void TearDown() override { executor_.shutdown(); } SingleThreadExecutor executor_; }; TEST_F(ExecutorTests, TaskEnqueue) { std::atomic<int> counter = 0; std::packaged_task<int()> task1([&]() { counter++; return 7; }); auto future1 = task1.get_future(); executor_.enqueue(Runnable{std::move(task1)}); EXPECT_EQ(future1.get(), 7); EXPECT_EQ(counter, 1); } TEST_F(ExecutorTests, TaskThread) { std::packaged_task<int()> task1([&]() { return getThreadIdWrapper(); }); auto future1 = task1.get_future(); executor_.enqueue(Runnable{std::move(task1)}); EXPECT_NE(future1.get(), getThreadIdWrapper()); } TEST_F(ExecutorTests, TaskOrder) { std::atomic<int> counter = 0; std::packaged_task<int()> task1([&]() { return counter++; }); std::packaged_task<int()> task2([&]() { return counter++; }); auto future1 = task1.get_future(); auto future2 = task2.get_future(); executor_.enqueue(Runnable{std::move(task1)}); executor_.enqueue(Runnable{std::move(task2)}); EXPECT_EQ(future1.get(), 0); EXPECT_EQ(future2.get(), 1); EXPECT_EQ(counter, 2); } TEST_F(ExecutorTests, EmptyTask) { // does not crash executor_.enqueue(Runnable{}); } TEST_F(ExecutorTests, ShutdownTwice) { executor_.shutdown(); executor_.shutdown(); }