Donate to e Foundation | Murena handsets with /e/OS | Own a part of Murena! Learn more

Commit 1f3c852b authored by Ilya Matyukhin's avatar Ilya Matyukhin
Browse files

Implement a simple worker thread

Bug: 166800618
Bug: 175070939
Test: atest --host android.hardware.biometrics.fingerprint.WorkerThreadTest
Change-Id: Ic84efbde21d0997450585078b311610fe752fa88
parent 124e70a8
Loading
Loading
Loading
Loading
+13 −0
Original line number Diff line number Diff line
@@ -17,3 +17,16 @@ cc_binary {
        "android.hardware.biometrics.common-V1-ndk_platform",
    ],
}

cc_test_host {
    name: "android.hardware.biometrics.fingerprint.WorkerThreadTest",
    local_include_dirs: ["include"],
    srcs: [
        "tests/WorkerThreadTest.cpp",
        "WorkerThread.cpp",
    ],
    shared_libs: [
        "libcutils",
    ],
    test_suites: ["general-tests"],
}
+68 −0
Original line number Diff line number Diff line
/*
 * Copyright (C) 2021 The Android Open Source Project
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *      http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

#include "WorkerThread.h"

namespace aidl::android::hardware::biometrics::fingerprint {

// It's important that mThread is initialized after everything else because it runs a member
// function that may use any member of this class.
WorkerThread::WorkerThread(size_t maxQueueSize)
    : mMaxSize(maxQueueSize),
      mIsDestructing(false),
      mQueue(),
      mQueueMutex(),
      mQueueCond(),
      mThread(&WorkerThread::threadFunc, this) {}

WorkerThread::~WorkerThread() {
    // This is a signal for threadFunc to terminate as soon as possible, and a hint for schedule
    // that it doesn't need to do any work.
    mIsDestructing = true;
    mQueueCond.notify_all();
    mThread.join();
}

bool WorkerThread::schedule(Task&& task) {
    if (mIsDestructing) {
        return false;
    }

    std::unique_lock<std::mutex> lock(mQueueMutex);
    if (mQueue.size() >= mMaxSize) {
        return false;
    }
    mQueue.push_back(std::move(task));
    lock.unlock();
    mQueueCond.notify_one();
    return true;
}

void WorkerThread::threadFunc() {
    while (!mIsDestructing) {
        std::unique_lock<std::mutex> lock(mQueueMutex);
        mQueueCond.wait(lock, [this] { return !mQueue.empty() || mIsDestructing; });
        if (mIsDestructing) {
            return;
        }
        Task task = std::move(mQueue.front());
        mQueue.pop_front();
        lock.unlock();
        task();
    }
}

}  // namespace aidl::android::hardware::biometrics::fingerprint
+74 −0
Original line number Diff line number Diff line
/*
 * Copyright (C) 2021 The Android Open Source Project
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *      http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

#pragma once

#include <mutex>
#include <optional>
#include <queue>
#include <thread>

namespace aidl::android::hardware::biometrics::fingerprint {

using Task = std::function<void()>;

// A class that encapsulates a worker thread and a task queue, and provides a convenient interface
// for a Session to schedule its tasks for asynchronous execution.
class WorkerThread final {
  public:
    // Internally creates a queue that cannot exceed maxQueueSize elements and a new thread that
    // polls the queue for tasks until this instance is destructed.
    explicit WorkerThread(size_t maxQueueSize);

    // Unblocks the internal queue and calls join on the internal thread allowing it to gracefully
    // exit.
    ~WorkerThread();

    // Disallow copying this class.
    WorkerThread(const WorkerThread&) = delete;
    WorkerThread& operator=(const WorkerThread&) = delete;

    // Also disable moving this class to simplify implementation.
    WorkerThread(WorkerThread&&) = delete;
    WorkerThread& operator=(WorkerThread&&) = delete;

    // If the internal queue is not full, pushes a task at the end of the queue and returns true.
    // Otherwise, returns false. If the queue is busy, blocks until it becomes available.
    bool schedule(Task&& task);

  private:
    // The function that runs on the internal thread. Sequentially runs the available tasks from
    // the queue. If the queue is empty, waits until a new task is added. If the worker is being
    // destructed, finishes its current task and gracefully exits.
    void threadFunc();

    // The maximum size that the queue is allowed to expand to.
    size_t mMaxSize;

    // Whether the destructor was called. If true, tells threadFunc to exit as soon as possible, and
    // tells schedule to avoid doing any work.
    std::atomic<bool> mIsDestructing;

    // Queue that's guarded by mQueueMutex and mQueueCond.
    std::deque<Task> mQueue;
    std::mutex mQueueMutex;
    std::condition_variable mQueueCond;

    // The internal thread that works on the tasks from the queue.
    std::thread mThread;
};

}  // namespace aidl::android::hardware::biometrics::fingerprint
+105 −0
Original line number Diff line number Diff line
/*
 * Copyright (C) 2021 The Android Open Source Project
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *      http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

#include <algorithm>
#include <chrono>
#include <future>
#include <thread>

#include <gtest/gtest.h>

#include "WorkerThread.h"

namespace {

using aidl::android::hardware::biometrics::fingerprint::WorkerThread;
using namespace std::chrono_literals;

TEST(WorkerThreadTest, ScheduleReturnsTrueWhenQueueHasSpace) {
    WorkerThread worker(1 /*maxQueueSize*/);
    for (int i = 0; i < 100; ++i) {
        EXPECT_TRUE(worker.schedule([] {}));
        // Allow enough time for the previous task to be processed.
        std::this_thread::sleep_for(2ms);
    }
}

TEST(WorkerThreadTest, ScheduleReturnsFalseWhenQueueIsFull) {
    WorkerThread worker(2 /*maxQueueSize*/);
    // Add a long-running task.
    worker.schedule([] { std::this_thread::sleep_for(1s); });

    // Allow enough time for the worker to start working on the previous task.
    std::this_thread::sleep_for(2ms);

    // Fill the worker's queue to the maximum.
    worker.schedule([] {});
    worker.schedule([] {});

    EXPECT_FALSE(worker.schedule([] {}));
}

TEST(WorkerThreadTest, TasksExecuteInOrder) {
    constexpr int NUM_TASKS = 10000;
    WorkerThread worker(NUM_TASKS);

    std::vector<int> results;
    for (int i = 0; i < NUM_TASKS; ++i) {
        worker.schedule([&results, i] {
            // Delay tasks differently to provoke races.
            std::this_thread::sleep_for(std::chrono::nanoseconds(100 - i % 100));
            // Unguarded write to results to provoke races.
            results.push_back(i);
        });
    }

    std::promise<void> promise;
    auto future = promise.get_future();

    // Schedule a special task to signal when all of the tasks are finished.
    worker.schedule([&promise] { promise.set_value(); });
    auto status = future.wait_for(1s);
    ASSERT_EQ(status, std::future_status::ready);

    ASSERT_EQ(results.size(), NUM_TASKS);
    EXPECT_TRUE(std::is_sorted(results.begin(), results.end()));
}

TEST(WorkerThreadTest, ExecutionStopsAfterWorkerIsDestroyed) {
    std::promise<void> promise1;
    std::promise<void> promise2;
    auto future1 = promise1.get_future();
    auto future2 = promise2.get_future();

    {
        WorkerThread worker(2 /*maxQueueSize*/);
        worker.schedule([&promise1] {
            promise1.set_value();
            std::this_thread::sleep_for(200ms);
        });
        worker.schedule([&promise2] { promise2.set_value(); });

        // Make sure the first task is executing.
        auto status1 = future1.wait_for(1s);
        ASSERT_EQ(status1, std::future_status::ready);
    }

    // The second task should never execute.
    auto status2 = future2.wait_for(1s);
    EXPECT_EQ(status2, std::future_status::timeout);
}

}  // namespace