Donate to e Foundation | Murena handsets with /e/OS | Own a part of Murena! Learn more

Commit 41cc06b0 authored by Atneya Nair's avatar Atneya Nair
Browse files

Add single thread executor to utils

Add a simple executor class which enqueues Runnable objects (for easy
usage with std::packaged_task) for execution on a single thread.

Test: atest executor_tests
Flag: EXEMPT util
Bug: 391668615
Change-Id: I68af4d1d8e90515208456171ec048e3a2674c82a
parent 9dee501c
Loading
Loading
Loading
Loading
+91 −0
Original line number Diff line number Diff line
/*
 * Copyright (C) 2025 The Android Open Source Project
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *      http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

#pragma once

#include <deque>
#include <mutex>

#include "Runnable.h"
#include "jthread.h"

namespace android::mediautils {

/**
 * A C++ implementation similar to a Java executor, which manages a thread which runs enqueued
 * runnable tasks in queue order. Spawns thread on construction and joins destruction
 */
class SingleThreadExecutor {
  public:
    SingleThreadExecutor() : thread_([this](stop_token stok) { run(stok); }) {}

    ~SingleThreadExecutor() { shutdown(/* dropTasks= */ true); }

    void enqueue(Runnable r) {
        if (!r) {
            return;
        } else {
            std::lock_guard l{mutex_};
            if (thread_.stop_requested()) return;
            task_list_.push_back(std::move(r));
        }
        cv_.notify_one();
    }

    /**
     * Request thread termination, optionally dropping any enqueued tasks.
     * Note: does not join thread in this method and no task cancellation.
     */
    void shutdown(bool dropTasks = false) {
        {
            std::lock_guard l{mutex_};
            if (thread_.stop_requested()) return;
            if (dropTasks) {
                task_list_.clear();
            }
            thread_.request_stop();  // fancy atomic bool, so no deadlock risk
        }
        // This condition variable notification is necessary since the stop_callback functionality
        // of stop_token is not fully implemented
        cv_.notify_one();
    }


  private:
    void run(stop_token stok) {
        std::unique_lock l{mutex_};
        while (true) {
            cv_.wait_for(l, std::chrono::seconds(3), [this, stok]() {
                return !task_list_.empty() || stok.stop_requested();
            });
            if (!task_list_.empty()) {
                Runnable r {std::move(task_list_.front())};
                task_list_.pop_front();
                l.unlock();
                r();
                l.lock();
            } else if (stok.stop_requested()) {
                break;
            } // else cv timeout
        }
    }

    std::condition_variable cv_;
    std::mutex mutex_;
    std::deque<Runnable> task_list_;
    jthread thread_;
};
}  // namespace android::mediautils
+3 −1
Original line number Diff line number Diff line
@@ -42,7 +42,7 @@ class stop_source {
    stop_token get_token() { return stop_token{*this}; }
    bool stop_requested() const { return cancellation_signal_.load(); }
    bool request_stop() {
        auto f = false;
        bool f = false;
        return cancellation_signal_.compare_exchange_strong(f, true);
    }

@@ -84,6 +84,8 @@ class jthread {

    bool request_stop() { return stop_source_.request_stop(); }

    bool stop_requested() const { return stop_source_.stop_requested(); }

  private:
    // order matters
    impl::stop_source stop_source_;
+1 −0
Original line number Diff line number Diff line
@@ -284,6 +284,7 @@ cc_test {
    name: "jthread_tests",
    defaults: ["libmediautils_tests_defaults"],
    srcs: [
        "executor_tests.cpp",
        "jthread_tests.cpp",
        "runnable_tests.cpp",
    ],
+78 −0
Original line number Diff line number Diff line
/*
 * Copyright (C) 2025 The Android Open Source Project
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *      http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

#define LOG_TAG "executor_tests"

#include <mediautils/SingleThreadExecutor.h>
#include <mediautils/TidWrapper.h>

#include <future>

#include <gtest/gtest.h>

using namespace android::mediautils;

class ExecutorTests : public ::testing::Test {
  protected:
    void TearDown() override { executor_.shutdown(); }
    SingleThreadExecutor executor_;
};

TEST_F(ExecutorTests, TaskEnqueue) {
    std::atomic<int> counter = 0;
    std::packaged_task<int()> task1([&]() {
        counter++;
        return 7;
    });

    auto future1 = task1.get_future();
    executor_.enqueue(Runnable{std::move(task1)});
    EXPECT_EQ(future1.get(), 7);
    EXPECT_EQ(counter, 1);
}

TEST_F(ExecutorTests, TaskThread) {
    std::packaged_task<int()> task1([&]() { return getThreadIdWrapper(); });

    auto future1 = task1.get_future();
    executor_.enqueue(Runnable{std::move(task1)});
    EXPECT_NE(future1.get(), getThreadIdWrapper());
}

TEST_F(ExecutorTests, TaskOrder) {
    std::atomic<int> counter = 0;
    std::packaged_task<int()> task1([&]() { return counter++; });
    std::packaged_task<int()> task2([&]() { return counter++; });
    auto future1 = task1.get_future();
    auto future2 = task2.get_future();

    executor_.enqueue(Runnable{std::move(task1)});
    executor_.enqueue(Runnable{std::move(task2)});

    EXPECT_EQ(future1.get(), 0);
    EXPECT_EQ(future2.get(), 1);
    EXPECT_EQ(counter, 2);
}

TEST_F(ExecutorTests, EmptyTask) {
    // does not crash
    executor_.enqueue(Runnable{});
}

TEST_F(ExecutorTests, ShutdownTwice) {
    executor_.shutdown();
    executor_.shutdown();
}