Donate to e Foundation | Murena handsets with /e/OS | Own a part of Murena! Learn more

Commit 55b6ead6 authored by Mikhail Naganov's avatar Mikhail Naganov Committed by Automerger Merge Worker
Browse files

audio: Implement setting name and priority in StreamWorker am: 48e2e8fe

parents d6070e0a 48e2e8fe
Loading
Loading
Loading
Loading
+2 −0
Original line number Diff line number Diff line
@@ -30,9 +30,11 @@ cc_library_headers {
    export_include_dirs: ["include"],
    header_libs: [
        "libbase_headers",
        "libsystem_headers",
    ],
    export_header_lib_headers: [
        "libbase_headers",
        "libsystem_headers",
    ],
}

+53 −16
Original line number Diff line number Diff line
@@ -16,29 +16,39 @@

#pragma once

#include <pthread.h>
#include <sched.h>
#include <sys/resource.h>

#include <atomic>
#include <condition_variable>
#include <mutex>
#include <string>
#include <thread>

#include <android-base/thread_annotations.h>
#include <system/thread_defs.h>

template <typename Impl>
class StreamWorker {
    enum class WorkerState { STOPPED, RUNNING, PAUSE_REQUESTED, PAUSED, RESUME_REQUESTED, ERROR };
    enum class WorkerState { STOPPED, RUNNING, PAUSE_REQUESTED, PAUSED, RESUME_REQUESTED };

  public:
    StreamWorker() = default;
    ~StreamWorker() { stop(); }
    bool start() {
    // Note that 'priority' here is what is known as the 'nice number' in *nix systems.
    // The nice number is used with the default scheduler. For threads that
    // need to use a specialized scheduler (e.g. SCHED_FIFO) and set the priority within it,
    // it is recommended to implement an appropriate configuration sequence within `workerInit`.
    bool start(const std::string& name = "", int priority = ANDROID_PRIORITY_DEFAULT) {
        mThreadName = name;
        mThreadPriority = priority;
        mWorker = std::thread(&StreamWorker::workerThread, this);
        std::unique_lock<std::mutex> lock(mWorkerLock);
        android::base::ScopedLockAssertion lock_assertion(mWorkerLock);
        mWorkerCv.wait(lock, [&]() {
            android::base::ScopedLockAssertion lock_assertion(mWorkerLock);
            return mWorkerState != WorkerState::STOPPED;
            return mWorkerState == WorkerState::RUNNING || !mError.empty();
        });
        mWorkerStateChangeRequest = false;
        return mWorkerState == WorkerState::RUNNING;
@@ -47,15 +57,21 @@ class StreamWorker {
    void resume() { switchWorkerStateSync(WorkerState::PAUSED, WorkerState::RESUME_REQUESTED); }
    bool hasError() {
        std::lock_guard<std::mutex> lock(mWorkerLock);
        return mWorkerState == WorkerState::ERROR;
        return !mError.empty();
    }
    std::string getError() {
        std::lock_guard<std::mutex> lock(mWorkerLock);
        return mError;
    }
    void stop() {
        {
            std::lock_guard<std::mutex> lock(mWorkerLock);
            if (mError.empty()) {
                if (mWorkerState == WorkerState::STOPPED) return;
                mWorkerState = WorkerState::STOPPED;
                mWorkerStateChangeRequest = true;
            }
        }
        if (mWorker.joinable()) {
            mWorker.join();
        }
@@ -71,17 +87,21 @@ class StreamWorker {
    void testLockUnlockMutex(bool lock) NO_THREAD_SAFETY_ANALYSIS {
        lock ? mWorkerLock.lock() : mWorkerLock.unlock();
    }
    std::thread::native_handle_type testGetThreadNativeHandle() { return mWorker.native_handle(); }

    // Methods that need to be provided by subclasses:
    //
    // Called once at the beginning of the thread loop. Must return
    // 'true' to enter the thread loop, otherwise the thread loop
    // exits and the worker switches into the 'error' state.
    // bool workerInit();
    // an empty string to enter the thread loop, otherwise the thread loop
    // exits and the worker switches into the 'error' state, setting
    // the error to the returned value.
    // std::string workerInit();
    //
    // Called for each thread loop unless the thread is in 'paused' state.
    // Must return 'true' to continue running, otherwise the thread loop
    // exits and the worker switches into the 'error' state.
    // exits and the worker switches into the 'error' state with a generic
    // error message. It is recommended that the subclass reports any
    // problems via logging facilities.
    // bool workerCycle();

  private:
@@ -102,13 +122,27 @@ class StreamWorker {
        if (finalState) *finalState = mWorkerState;
    }
    void workerThread() {
        bool success = static_cast<Impl*>(this)->workerInit();
        std::string error = static_cast<Impl*>(this)->workerInit();
        if (error.empty() && !mThreadName.empty()) {
            std::string compliantName(mThreadName.substr(0, 15));
            if (int errCode = pthread_setname_np(pthread_self(), compliantName.c_str());
                errCode != 0) {
                error.append("Failed to set thread name: ").append(strerror(errCode));
            }
        }
        if (error.empty() && mThreadPriority != ANDROID_PRIORITY_DEFAULT) {
            if (int result = setpriority(PRIO_PROCESS, 0, mThreadPriority); result != 0) {
                int errCode = errno;
                error.append("Failed to set thread priority: ").append(strerror(errCode));
            }
        }
        {
            std::lock_guard<std::mutex> lock(mWorkerLock);
            mWorkerState = success ? WorkerState::RUNNING : WorkerState::ERROR;
            mWorkerState = error.empty() ? WorkerState::RUNNING : WorkerState::STOPPED;
            mError = error;
        }
        mWorkerCv.notify_one();
        if (!success) return;
        if (!error.empty()) return;

        for (WorkerState state = WorkerState::RUNNING; state != WorkerState::STOPPED;) {
            bool needToNotify = false;
@@ -153,8 +187,8 @@ class StreamWorker {
                    mWorkerState == WorkerState::PAUSE_REQUESTED) {
                    needToNotify = true;
                }
                mWorkerState = WorkerState::ERROR;
                state = WorkerState::STOPPED;
                state = mWorkerState = WorkerState::STOPPED;
                mError = "workerCycle failed";
            }
            if (needToNotify) {
                {
@@ -166,10 +200,13 @@ class StreamWorker {
        }
    }

    std::string mThreadName;
    int mThreadPriority = ANDROID_PRIORITY_DEFAULT;
    std::thread mWorker;
    std::mutex mWorkerLock;
    std::condition_variable mWorkerCv;
    WorkerState mWorkerState GUARDED_BY(mWorkerLock) = WorkerState::STOPPED;
    std::string mError GUARDED_BY(mWorkerLock);
    // The atomic lock-free variable is used to prevent priority inversions
    // that can occur when a high priority worker tries to acquire the lock
    // which has been taken by a lower priority control thread which in its turn
+21 −1
Original line number Diff line number Diff line
@@ -14,8 +14,10 @@
 * limitations under the License.
 */

#include <pthread.h>
#include <sched.h>
#include <unistd.h>

#include <atomic>

#include <StreamWorker.h>
@@ -34,6 +36,7 @@ class TestWorker : public StreamWorker<TestWorker> {
    explicit TestWorker(TestStream* stream) : mStream(stream) {}

    size_t getWorkerCycles() const { return mWorkerCycles; }
    int getPriority() const { return mPriority; }
    bool hasWorkerCycleCalled() const { return mWorkerCycles != 0; }
    bool hasNoWorkerCycleCalled(useconds_t usec) {
        const size_t cyclesBefore = mWorkerCycles;
@@ -41,8 +44,9 @@ class TestWorker : public StreamWorker<TestWorker> {
        return mWorkerCycles == cyclesBefore;
    }

    bool workerInit() { return mStream; }
    std::string workerInit() { return mStream != nullptr ? "" : "Expected error"; }
    bool workerCycle() {
        mPriority = getpriority(PRIO_PROCESS, 0);
        do {
            mWorkerCycles++;
        } while (mWorkerCycles == 0);
@@ -52,6 +56,7 @@ class TestWorker : public StreamWorker<TestWorker> {
  private:
    TestStream* const mStream;
    std::atomic<size_t> mWorkerCycles = 0;
    std::atomic<int> mPriority = ANDROID_PRIORITY_DEFAULT;
};

// The parameter specifies whether an extra call to 'stop' is made at the end.
@@ -219,4 +224,19 @@ TEST_P(StreamWorkerTest, MutexDoesNotBlockWorker) {
    EXPECT_FALSE(worker.hasError());
}

TEST_P(StreamWorkerTest, ThreadName) {
    const std::string workerName = "TestWorker";
    ASSERT_TRUE(worker.start(workerName)) << worker.getError();
    char nameBuf[128];
    ASSERT_EQ(0, pthread_getname_np(worker.testGetThreadNativeHandle(), nameBuf, sizeof(nameBuf)));
    EXPECT_EQ(workerName, nameBuf);
}

TEST_P(StreamWorkerTest, ThreadPriority) {
    const int priority = ANDROID_PRIORITY_LOWEST;
    ASSERT_TRUE(worker.start("", priority)) << worker.getError();
    worker.waitForAtLeastOneCycle();
    EXPECT_EQ(priority, worker.getPriority());
}

INSTANTIATE_TEST_SUITE_P(StreamWorker, StreamWorkerTest, testing::Bool());