Donate to e Foundation | Murena handsets with /e/OS | Own a part of Murena! Learn more

Commit 33b5a6d8 authored by Mikhail Naganov's avatar Mikhail Naganov Committed by Automerger Merge Worker
Browse files

audio: Allow stopping a StreamWorker from the looping thread am: 48d31156

parents d1244176 48d31156
Loading
Loading
Loading
Loading
+28 −15
Original line number Original line Diff line number Diff line
@@ -29,11 +29,15 @@
#include <android-base/thread_annotations.h>
#include <android-base/thread_annotations.h>
#include <system/thread_defs.h>
#include <system/thread_defs.h>


namespace android::hardware::audio::common {

template <typename Impl>
template <typename Impl>
class StreamWorker {
class StreamWorker {
    enum class WorkerState { STOPPED, RUNNING, PAUSE_REQUESTED, PAUSED, RESUME_REQUESTED };
    enum class WorkerState { STOPPED, RUNNING, PAUSE_REQUESTED, PAUSED, RESUME_REQUESTED };


  public:
  public:
    enum class WorkerStatus { ABORT, CONTINUE, EXIT };

    StreamWorker() = default;
    StreamWorker() = default;
    ~StreamWorker() { stop(); }
    ~StreamWorker() { stop(); }
    // Note that 'priority' here is what is known as the 'nice number' in *nix systems.
    // Note that 'priority' here is what is known as the 'nice number' in *nix systems.
@@ -66,8 +70,7 @@ class StreamWorker {
    void stop() {
    void stop() {
        {
        {
            std::lock_guard<std::mutex> lock(mWorkerLock);
            std::lock_guard<std::mutex> lock(mWorkerLock);
            if (mError.empty()) {
            if (mWorkerState != WorkerState::STOPPED) {
                if (mWorkerState == WorkerState::STOPPED) return;
                mWorkerState = WorkerState::STOPPED;
                mWorkerState = WorkerState::STOPPED;
                mWorkerStateChangeRequest = true;
                mWorkerStateChangeRequest = true;
            }
            }
@@ -91,18 +94,22 @@ class StreamWorker {


    // Methods that need to be provided by subclasses:
    // Methods that need to be provided by subclasses:
    //
    //
    // Called once at the beginning of the thread loop. Must return
    // /* Called once at the beginning of the thread loop. Must return
    // an empty string to enter the thread loop, otherwise the thread loop
    //  * an empty string to enter the thread loop, otherwise the thread loop
    // exits and the worker switches into the 'error' state, setting
    //  * exits and the worker switches into the 'error' state, setting
    // the error to the returned value.
    //  * the error to the returned value.
    //  */
    // std::string workerInit();
    // std::string workerInit();
    //
    //
    // Called for each thread loop unless the thread is in 'paused' state.
    // /* Called for each thread loop unless the thread is in 'paused' state.
    // Must return 'true' to continue running, otherwise the thread loop
    //  * Must return 'CONTINUE' to continue running, otherwise the thread loop
    // exits and the worker switches into the 'error' state with a generic
    //  * exits. If the result from worker cycle is 'ABORT' then the worker switches
    // error message. It is recommended that the subclass reports any
    //  * into the 'error' state with a generic error message. It is recommended that
    // problems via logging facilities.
    //  * the subclass reports any problems via logging facilities. Returning the 'EXIT'
    // bool workerCycle();
    //  * status is equivalent to calling 'stop()' method. This is just a way of
    //  * of stopping the worker by its own initiative.
    //  */
    // WorkerStatus workerCycle();


  private:
  private:
    void switchWorkerStateSync(WorkerState oldState, WorkerState newState,
    void switchWorkerStateSync(WorkerState oldState, WorkerState newState,
@@ -146,8 +153,10 @@ class StreamWorker {


        for (WorkerState state = WorkerState::RUNNING; state != WorkerState::STOPPED;) {
        for (WorkerState state = WorkerState::RUNNING; state != WorkerState::STOPPED;) {
            bool needToNotify = false;
            bool needToNotify = false;
            if (state != WorkerState::PAUSED ? static_cast<Impl*>(this)->workerCycle()
            if (WorkerStatus status = state != WorkerState::PAUSED
                                             : (sched_yield(), true)) {
                                              ? static_cast<Impl*>(this)->workerCycle()
                                              : (sched_yield(), WorkerStatus::CONTINUE);
                status == WorkerStatus::CONTINUE) {
                {
                {
                    // See https://developer.android.com/training/articles/smp#nonracing
                    // See https://developer.android.com/training/articles/smp#nonracing
                    android::base::ScopedLockAssertion lock_assertion(mWorkerLock);
                    android::base::ScopedLockAssertion lock_assertion(mWorkerLock);
@@ -188,7 +197,9 @@ class StreamWorker {
                    needToNotify = true;
                    needToNotify = true;
                }
                }
                state = mWorkerState = WorkerState::STOPPED;
                state = mWorkerState = WorkerState::STOPPED;
                mError = "workerCycle failed";
                if (status == WorkerStatus::ABORT) {
                    mError = "workerCycle aborted";
                }
            }
            }
            if (needToNotify) {
            if (needToNotify) {
                {
                {
@@ -218,3 +229,5 @@ class StreamWorker {
    static_assert(std::atomic<bool>::is_always_lock_free);
    static_assert(std::atomic<bool>::is_always_lock_free);
    std::atomic<bool> mWorkerStateChangeRequest GUARDED_BY(mWorkerLock) = false;
    std::atomic<bool> mWorkerStateChangeRequest GUARDED_BY(mWorkerLock) = false;
};
};

}  // namespace android::hardware::audio::common
+50 −19
Original line number Original line Diff line number Diff line
@@ -26,14 +26,18 @@
#define LOG_TAG "StreamWorker_Test"
#define LOG_TAG "StreamWorker_Test"
#include <log/log.h>
#include <log/log.h>


struct TestStream {
using android::hardware::audio::common::StreamWorker;
    std::atomic<bool> error = false;
};


class TestWorker : public StreamWorker<TestWorker> {
class TestWorker : public StreamWorker<TestWorker> {
  public:
  public:
    struct Stream {
        void setErrorStatus() { status = WorkerStatus::ABORT; }
        void setStopStatus() { status = WorkerStatus::EXIT; }
        std::atomic<WorkerStatus> status = WorkerStatus::CONTINUE;
    };

    // Use nullptr to test error reporting from the worker thread.
    // Use nullptr to test error reporting from the worker thread.
    explicit TestWorker(TestStream* stream) : mStream(stream) {}
    explicit TestWorker(Stream* stream) : mStream(stream) {}


    size_t getWorkerCycles() const { return mWorkerCycles; }
    size_t getWorkerCycles() const { return mWorkerCycles; }
    int getPriority() const { return mPriority; }
    int getPriority() const { return mPriority; }
@@ -45,16 +49,16 @@ class TestWorker : public StreamWorker<TestWorker> {
    }
    }


    std::string workerInit() { return mStream != nullptr ? "" : "Expected error"; }
    std::string workerInit() { return mStream != nullptr ? "" : "Expected error"; }
    bool workerCycle() {
    WorkerStatus workerCycle() {
        mPriority = getpriority(PRIO_PROCESS, 0);
        mPriority = getpriority(PRIO_PROCESS, 0);
        do {
        do {
            mWorkerCycles++;
            mWorkerCycles++;
        } while (mWorkerCycles == 0);
        } while (mWorkerCycles == 0);
        return !mStream->error;
        return mStream->status;
    }
    }


  private:
  private:
    TestStream* const mStream;
    Stream* const mStream;
    std::atomic<size_t> mWorkerCycles = 0;
    std::atomic<size_t> mWorkerCycles = 0;
    std::atomic<int> mPriority = ANDROID_PRIORITY_DEFAULT;
    std::atomic<int> mPriority = ANDROID_PRIORITY_DEFAULT;
};
};
@@ -70,7 +74,8 @@ class StreamWorkerInvalidTest : public testing::TestWithParam<bool> {
    }
    }


  protected:
  protected:
    StreamWorkerInvalidTest(TestStream* stream) : testing::TestWithParam<bool>(), worker(stream) {}
    StreamWorkerInvalidTest(TestWorker::Stream* stream)
        : testing::TestWithParam<bool>(), worker(stream) {}
    TestWorker worker;
    TestWorker worker;
};
};


@@ -118,7 +123,7 @@ class StreamWorkerTest : public StreamWorkerInvalidTest {
    StreamWorkerTest() : StreamWorkerInvalidTest(&stream) {}
    StreamWorkerTest() : StreamWorkerInvalidTest(&stream) {}


  protected:
  protected:
    TestStream stream;
    TestWorker::Stream stream;
};
};


static constexpr unsigned kWorkerIdleCheckTime = 50 * 1000;
static constexpr unsigned kWorkerIdleCheckTime = 50 * 1000;
@@ -130,21 +135,47 @@ TEST_P(StreamWorkerTest, Uninitialized) {


TEST_P(StreamWorkerTest, Start) {
TEST_P(StreamWorkerTest, Start) {
    ASSERT_TRUE(worker.start());
    ASSERT_TRUE(worker.start());
    EXPECT_TRUE(worker.waitForAtLeastOneCycle());
    EXPECT_FALSE(worker.hasError());
}

TEST_P(StreamWorkerTest, StartStop) {
    ASSERT_TRUE(worker.start());
    EXPECT_TRUE(worker.waitForAtLeastOneCycle());
    EXPECT_FALSE(worker.hasError());
    worker.stop();
    EXPECT_FALSE(worker.hasError());
}

TEST_P(StreamWorkerTest, WorkerExit) {
    ASSERT_TRUE(worker.start());
    stream.setStopStatus();
    worker.waitForAtLeastOneCycle();
    worker.waitForAtLeastOneCycle();
    EXPECT_FALSE(worker.hasError());
    EXPECT_FALSE(worker.hasError());
    EXPECT_TRUE(worker.hasNoWorkerCycleCalled(kWorkerIdleCheckTime));
}
}


TEST_P(StreamWorkerTest, WorkerError) {
TEST_P(StreamWorkerTest, WorkerError) {
    ASSERT_TRUE(worker.start());
    ASSERT_TRUE(worker.start());
    stream.error = true;
    stream.setErrorStatus();
    worker.waitForAtLeastOneCycle();
    worker.waitForAtLeastOneCycle();
    EXPECT_TRUE(worker.hasError());
    EXPECT_TRUE(worker.hasError());
    EXPECT_TRUE(worker.hasNoWorkerCycleCalled(kWorkerIdleCheckTime));
    EXPECT_TRUE(worker.hasNoWorkerCycleCalled(kWorkerIdleCheckTime));
}
}


TEST_P(StreamWorkerTest, PauseResume) {
TEST_P(StreamWorkerTest, StopAfterError) {
    ASSERT_TRUE(worker.start());
    ASSERT_TRUE(worker.start());
    stream.setErrorStatus();
    worker.waitForAtLeastOneCycle();
    worker.waitForAtLeastOneCycle();
    EXPECT_TRUE(worker.hasError());
    EXPECT_TRUE(worker.hasNoWorkerCycleCalled(kWorkerIdleCheckTime));
    worker.stop();
    EXPECT_TRUE(worker.hasError());
}

TEST_P(StreamWorkerTest, PauseResume) {
    ASSERT_TRUE(worker.start());
    EXPECT_TRUE(worker.waitForAtLeastOneCycle());
    EXPECT_FALSE(worker.hasError());
    EXPECT_FALSE(worker.hasError());
    worker.pause();
    worker.pause();
    EXPECT_TRUE(worker.hasNoWorkerCycleCalled(kWorkerIdleCheckTime));
    EXPECT_TRUE(worker.hasNoWorkerCycleCalled(kWorkerIdleCheckTime));
@@ -158,7 +189,7 @@ TEST_P(StreamWorkerTest, PauseResume) {


TEST_P(StreamWorkerTest, StopPaused) {
TEST_P(StreamWorkerTest, StopPaused) {
    ASSERT_TRUE(worker.start());
    ASSERT_TRUE(worker.start());
    worker.waitForAtLeastOneCycle();
    EXPECT_TRUE(worker.waitForAtLeastOneCycle());
    EXPECT_FALSE(worker.hasError());
    EXPECT_FALSE(worker.hasError());
    worker.pause();
    worker.pause();
    worker.stop();
    worker.stop();
@@ -167,7 +198,7 @@ TEST_P(StreamWorkerTest, StopPaused) {


TEST_P(StreamWorkerTest, PauseAfterErrorIgnored) {
TEST_P(StreamWorkerTest, PauseAfterErrorIgnored) {
    ASSERT_TRUE(worker.start());
    ASSERT_TRUE(worker.start());
    stream.error = true;
    stream.setErrorStatus();
    worker.waitForAtLeastOneCycle();
    worker.waitForAtLeastOneCycle();
    EXPECT_TRUE(worker.hasError());
    EXPECT_TRUE(worker.hasError());
    worker.pause();
    worker.pause();
@@ -177,7 +208,7 @@ TEST_P(StreamWorkerTest, PauseAfterErrorIgnored) {


TEST_P(StreamWorkerTest, ResumeAfterErrorIgnored) {
TEST_P(StreamWorkerTest, ResumeAfterErrorIgnored) {
    ASSERT_TRUE(worker.start());
    ASSERT_TRUE(worker.start());
    stream.error = true;
    stream.setErrorStatus();
    worker.waitForAtLeastOneCycle();
    worker.waitForAtLeastOneCycle();
    EXPECT_TRUE(worker.hasError());
    EXPECT_TRUE(worker.hasError());
    worker.resume();
    worker.resume();
@@ -187,11 +218,11 @@ TEST_P(StreamWorkerTest, ResumeAfterErrorIgnored) {


TEST_P(StreamWorkerTest, WorkerErrorOnResume) {
TEST_P(StreamWorkerTest, WorkerErrorOnResume) {
    ASSERT_TRUE(worker.start());
    ASSERT_TRUE(worker.start());
    worker.waitForAtLeastOneCycle();
    EXPECT_TRUE(worker.waitForAtLeastOneCycle());
    EXPECT_FALSE(worker.hasError());
    EXPECT_FALSE(worker.hasError());
    worker.pause();
    worker.pause();
    EXPECT_FALSE(worker.hasError());
    EXPECT_FALSE(worker.hasError());
    stream.error = true;
    stream.setErrorStatus();
    EXPECT_FALSE(worker.hasError());
    EXPECT_FALSE(worker.hasError());
    worker.resume();
    worker.resume();
    worker.waitForAtLeastOneCycle();
    worker.waitForAtLeastOneCycle();
@@ -208,7 +239,7 @@ TEST_P(StreamWorkerTest, WaitForAtLeastOneCycle) {


TEST_P(StreamWorkerTest, WaitForAtLeastOneCycleError) {
TEST_P(StreamWorkerTest, WaitForAtLeastOneCycleError) {
    ASSERT_TRUE(worker.start());
    ASSERT_TRUE(worker.start());
    stream.error = true;
    stream.setErrorStatus();
    EXPECT_FALSE(worker.waitForAtLeastOneCycle());
    EXPECT_FALSE(worker.waitForAtLeastOneCycle());
}
}


@@ -220,7 +251,7 @@ TEST_P(StreamWorkerTest, MutexDoesNotBlockWorker) {
        usleep(kWorkerIdleCheckTime);
        usleep(kWorkerIdleCheckTime);
    }
    }
    worker.testLockUnlockMutex(false);
    worker.testLockUnlockMutex(false);
    worker.waitForAtLeastOneCycle();
    EXPECT_TRUE(worker.waitForAtLeastOneCycle());
    EXPECT_FALSE(worker.hasError());
    EXPECT_FALSE(worker.hasError());
}
}


@@ -235,7 +266,7 @@ TEST_P(StreamWorkerTest, ThreadName) {
TEST_P(StreamWorkerTest, ThreadPriority) {
TEST_P(StreamWorkerTest, ThreadPriority) {
    const int priority = ANDROID_PRIORITY_LOWEST;
    const int priority = ANDROID_PRIORITY_LOWEST;
    ASSERT_TRUE(worker.start("", priority)) << worker.getError();
    ASSERT_TRUE(worker.start("", priority)) << worker.getError();
    worker.waitForAtLeastOneCycle();
    EXPECT_TRUE(worker.waitForAtLeastOneCycle());
    EXPECT_EQ(priority, worker.getPriority());
    EXPECT_EQ(priority, worker.getPriority());
}
}