Donate to e Foundation | Murena handsets with /e/OS | Own a part of Murena! Learn more

Commit 614e4b5f authored by Mikhail Naganov's avatar Mikhail Naganov
Browse files

audio: Add StreamWorker to aidl/common

This utility class has been copied from HIDL VTS.
It will be used both for the default implementation
and AIDL VTS, and might need modifications.

Bug: 205884982
Test: atest libaudioaidlcommon_test
Merged-In: I43b35b0c23ae45305dca66e15b60820cad19635e
Change-Id: I43b35b0c23ae45305dca66e15b60820cad19635e
(cherry picked from commit c17f0484)
parent b4adbffd
Loading
Loading
Loading
Loading
+61 −0
Original line number Diff line number Diff line
/*
 * Copyright (C) 2022 The Android Open Source Project
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *      http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package {
    // See: http://go/android-license-faq
    // A large-scale-change added 'default_applicable_licenses' to import
    // all of the 'license_kinds' from "hardware_interfaces_license"
    // to get the below license kinds:
    //   SPDX-license-identifier-Apache-2.0
    default_applicable_licenses: ["hardware_interfaces_license"],
}

cc_library_headers {
    name: "libaudioaidlcommon",
    host_supported: true,
    vendor_available: true,
    export_include_dirs: ["include"],
    header_libs: [
        "libbase_headers",
    ],
    export_header_lib_headers: [
        "libbase_headers",
    ],
}

cc_test {
    name: "libaudioaidlcommon_test",
    host_supported: true,
    vendor_available: true,
    header_libs: [
        "libaudioaidlcommon",
    ],
    shared_libs: [
        "liblog",
    ],
    cflags: [
        "-Wall",
        "-Wextra",
        "-Werror",
        "-Wthread-safety",
    ],
    srcs: [
        "tests/streamworker_tests.cpp",
    ],
    test_suites: [
        "general-tests",
    ],
}
+7 −0
Original line number Diff line number Diff line
{
  "presubmit": [
    {
      "name": "libaudioaidlcommon_test"
    }
  ]
}
+156 −0
Original line number Diff line number Diff line
/*
 * Copyright (C) 2022 The Android Open Source Project
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *      http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

#pragma once

#include <sched.h>

#include <condition_variable>
#include <mutex>
#include <thread>

#include <android-base/thread_annotations.h>

template <typename Impl>
class StreamWorker {
    enum class WorkerState { STOPPED, RUNNING, PAUSE_REQUESTED, PAUSED, RESUME_REQUESTED, ERROR };

  public:
    StreamWorker() = default;
    ~StreamWorker() { stop(); }
    bool start() {
        mWorker = std::thread(&StreamWorker::workerThread, this);
        std::unique_lock<std::mutex> lock(mWorkerLock);
        android::base::ScopedLockAssertion lock_assertion(mWorkerLock);
        mWorkerCv.wait(lock, [&]() {
            android::base::ScopedLockAssertion lock_assertion(mWorkerLock);
            return mWorkerState != WorkerState::STOPPED;
        });
        return mWorkerState == WorkerState::RUNNING;
    }
    void pause() { switchWorkerStateSync(WorkerState::RUNNING, WorkerState::PAUSE_REQUESTED); }
    void resume() { switchWorkerStateSync(WorkerState::PAUSED, WorkerState::RESUME_REQUESTED); }
    bool hasError() {
        std::lock_guard<std::mutex> lock(mWorkerLock);
        return mWorkerState == WorkerState::ERROR;
    }
    void stop() {
        {
            std::lock_guard<std::mutex> lock(mWorkerLock);
            if (mWorkerState == WorkerState::STOPPED) return;
            mWorkerState = WorkerState::STOPPED;
        }
        if (mWorker.joinable()) {
            mWorker.join();
        }
    }
    bool waitForAtLeastOneCycle() {
        WorkerState newState;
        switchWorkerStateSync(WorkerState::RUNNING, WorkerState::PAUSE_REQUESTED, &newState);
        if (newState != WorkerState::PAUSED) return false;
        switchWorkerStateSync(newState, WorkerState::RESUME_REQUESTED, &newState);
        return newState == WorkerState::RUNNING;
    }

    // Methods that need to be provided by subclasses:
    //
    // Called once at the beginning of the thread loop. Must return
    // 'true' to enter the thread loop, otherwise the thread loop
    // exits and the worker switches into the 'error' state.
    // bool workerInit();
    //
    // Called for each thread loop unless the thread is in 'paused' state.
    // Must return 'true' to continue running, otherwise the thread loop
    // exits and the worker switches into the 'error' state.
    // bool workerCycle();

  private:
    void switchWorkerStateSync(WorkerState oldState, WorkerState newState,
                               WorkerState* finalState = nullptr) {
        std::unique_lock<std::mutex> lock(mWorkerLock);
        android::base::ScopedLockAssertion lock_assertion(mWorkerLock);
        if (mWorkerState != oldState) {
            if (finalState) *finalState = mWorkerState;
            return;
        }
        mWorkerState = newState;
        mWorkerCv.wait(lock, [&]() {
            android::base::ScopedLockAssertion lock_assertion(mWorkerLock);
            return mWorkerState != newState;
        });
        if (finalState) *finalState = mWorkerState;
    }
    void workerThread() {
        bool success = static_cast<Impl*>(this)->workerInit();
        {
            std::lock_guard<std::mutex> lock(mWorkerLock);
            mWorkerState = success ? WorkerState::RUNNING : WorkerState::ERROR;
        }
        mWorkerCv.notify_one();
        if (!success) return;

        for (WorkerState state = WorkerState::RUNNING; state != WorkerState::STOPPED;) {
            bool needToNotify = false;
            if (state != WorkerState::PAUSED ? static_cast<Impl*>(this)->workerCycle()
                                             : (sched_yield(), true)) {
                //
                // Pause and resume are synchronous. One worker cycle must complete
                // before the worker indicates a state change. This is how 'mWorkerState' and
                // 'state' interact:
                //
                // mWorkerState == RUNNING
                // client sets mWorkerState := PAUSE_REQUESTED
                // last workerCycle gets executed, state := mWorkerState := PAUSED by us
                //   (or the workers enters the 'error' state if workerCycle fails)
                // client gets notified about state change in any case
                // thread is doing a busy wait while 'state == PAUSED'
                // client sets mWorkerState := RESUME_REQUESTED
                // state := mWorkerState (RESUME_REQUESTED)
                // mWorkerState := RUNNING, but we don't notify the client yet
                // first workerCycle gets executed, the code below triggers a client notification
                //   (or if workerCycle fails, worker enters 'error' state and also notifies)
                // state := mWorkerState (RUNNING)
                if (state == WorkerState::RESUME_REQUESTED) {
                    needToNotify = true;
                }
                std::lock_guard<std::mutex> lock(mWorkerLock);
                state = mWorkerState;
                if (mWorkerState == WorkerState::PAUSE_REQUESTED) {
                    state = mWorkerState = WorkerState::PAUSED;
                    needToNotify = true;
                } else if (mWorkerState == WorkerState::RESUME_REQUESTED) {
                    mWorkerState = WorkerState::RUNNING;
                }
            } else {
                std::lock_guard<std::mutex> lock(mWorkerLock);
                if (state == WorkerState::RESUME_REQUESTED ||
                    mWorkerState == WorkerState::PAUSE_REQUESTED) {
                    needToNotify = true;
                }
                mWorkerState = WorkerState::ERROR;
                state = WorkerState::STOPPED;
            }
            if (needToNotify) {
                mWorkerCv.notify_one();
            }
        }
    }

    std::thread mWorker;
    std::mutex mWorkerLock;
    std::condition_variable mWorkerCv;
    WorkerState mWorkerState GUARDED_BY(mWorkerLock) = WorkerState::STOPPED;
};
+210 −0
Original line number Diff line number Diff line
/*
 * Copyright (C) 2022 The Android Open Source Project
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *      http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

#include <sched.h>
#include <unistd.h>
#include <atomic>

#include <StreamWorker.h>

#include <gtest/gtest.h>
#define LOG_TAG "StreamWorker_Test"
#include <log/log.h>

struct TestStream {
    std::atomic<bool> error = false;
};

class TestWorker : public StreamWorker<TestWorker> {
  public:
    // Use nullptr to test error reporting from the worker thread.
    explicit TestWorker(TestStream* stream) : mStream(stream) {}

    size_t getWorkerCycles() const { return mWorkerCycles; }
    bool hasWorkerCycleCalled() const { return mWorkerCycles != 0; }
    bool hasNoWorkerCycleCalled(useconds_t usec) {
        const size_t cyclesBefore = mWorkerCycles;
        usleep(usec);
        return mWorkerCycles == cyclesBefore;
    }

    bool workerInit() { return mStream; }
    bool workerCycle() {
        do {
            mWorkerCycles++;
        } while (mWorkerCycles == 0);
        return !mStream->error;
    }

  private:
    TestStream* const mStream;
    std::atomic<size_t> mWorkerCycles = 0;
};

// The parameter specifies whether an extra call to 'stop' is made at the end.
class StreamWorkerInvalidTest : public testing::TestWithParam<bool> {
  public:
    StreamWorkerInvalidTest() : StreamWorkerInvalidTest(nullptr) {}
    void TearDown() override {
        if (GetParam()) {
            worker.stop();
        }
    }

  protected:
    StreamWorkerInvalidTest(TestStream* stream) : testing::TestWithParam<bool>(), worker(stream) {}
    TestWorker worker;
};

TEST_P(StreamWorkerInvalidTest, Uninitialized) {
    EXPECT_FALSE(worker.hasWorkerCycleCalled());
    EXPECT_FALSE(worker.hasError());
}

TEST_P(StreamWorkerInvalidTest, UninitializedPauseIgnored) {
    EXPECT_FALSE(worker.hasError());
    worker.pause();
    EXPECT_FALSE(worker.hasError());
}

TEST_P(StreamWorkerInvalidTest, UninitializedResumeIgnored) {
    EXPECT_FALSE(worker.hasError());
    worker.resume();
    EXPECT_FALSE(worker.hasError());
}

TEST_P(StreamWorkerInvalidTest, Start) {
    EXPECT_FALSE(worker.start());
    EXPECT_FALSE(worker.hasWorkerCycleCalled());
    EXPECT_TRUE(worker.hasError());
}

TEST_P(StreamWorkerInvalidTest, PauseIgnored) {
    EXPECT_FALSE(worker.start());
    EXPECT_TRUE(worker.hasError());
    worker.pause();
    EXPECT_TRUE(worker.hasError());
}

TEST_P(StreamWorkerInvalidTest, ResumeIgnored) {
    EXPECT_FALSE(worker.start());
    EXPECT_TRUE(worker.hasError());
    worker.resume();
    EXPECT_TRUE(worker.hasError());
}

INSTANTIATE_TEST_SUITE_P(StreamWorkerInvalid, StreamWorkerInvalidTest, testing::Bool());

class StreamWorkerTest : public StreamWorkerInvalidTest {
  public:
    StreamWorkerTest() : StreamWorkerInvalidTest(&stream) {}

  protected:
    TestStream stream;
};

static constexpr unsigned kWorkerIdleCheckTime = 50 * 1000;

TEST_P(StreamWorkerTest, Uninitialized) {
    EXPECT_FALSE(worker.hasWorkerCycleCalled());
    EXPECT_FALSE(worker.hasError());
}

TEST_P(StreamWorkerTest, Start) {
    ASSERT_TRUE(worker.start());
    worker.waitForAtLeastOneCycle();
    EXPECT_FALSE(worker.hasError());
}

TEST_P(StreamWorkerTest, WorkerError) {
    ASSERT_TRUE(worker.start());
    stream.error = true;
    worker.waitForAtLeastOneCycle();
    EXPECT_TRUE(worker.hasError());
    EXPECT_TRUE(worker.hasNoWorkerCycleCalled(kWorkerIdleCheckTime));
}

TEST_P(StreamWorkerTest, PauseResume) {
    ASSERT_TRUE(worker.start());
    worker.waitForAtLeastOneCycle();
    EXPECT_FALSE(worker.hasError());
    worker.pause();
    EXPECT_TRUE(worker.hasNoWorkerCycleCalled(kWorkerIdleCheckTime));
    EXPECT_FALSE(worker.hasError());
    const size_t workerCyclesBefore = worker.getWorkerCycles();
    worker.resume();
    // 'resume' is synchronous and returns after the worker has looped at least once.
    EXPECT_GT(worker.getWorkerCycles(), workerCyclesBefore);
    EXPECT_FALSE(worker.hasError());
}

TEST_P(StreamWorkerTest, StopPaused) {
    ASSERT_TRUE(worker.start());
    worker.waitForAtLeastOneCycle();
    EXPECT_FALSE(worker.hasError());
    worker.pause();
    worker.stop();
    EXPECT_FALSE(worker.hasError());
}

TEST_P(StreamWorkerTest, PauseAfterErrorIgnored) {
    ASSERT_TRUE(worker.start());
    stream.error = true;
    worker.waitForAtLeastOneCycle();
    EXPECT_TRUE(worker.hasError());
    worker.pause();
    EXPECT_TRUE(worker.hasNoWorkerCycleCalled(kWorkerIdleCheckTime));
    EXPECT_TRUE(worker.hasError());
}

TEST_P(StreamWorkerTest, ResumeAfterErrorIgnored) {
    ASSERT_TRUE(worker.start());
    stream.error = true;
    worker.waitForAtLeastOneCycle();
    EXPECT_TRUE(worker.hasError());
    worker.resume();
    EXPECT_TRUE(worker.hasNoWorkerCycleCalled(kWorkerIdleCheckTime));
    EXPECT_TRUE(worker.hasError());
}

TEST_P(StreamWorkerTest, WorkerErrorOnResume) {
    ASSERT_TRUE(worker.start());
    worker.waitForAtLeastOneCycle();
    EXPECT_FALSE(worker.hasError());
    worker.pause();
    EXPECT_FALSE(worker.hasError());
    stream.error = true;
    EXPECT_FALSE(worker.hasError());
    worker.resume();
    worker.waitForAtLeastOneCycle();
    EXPECT_TRUE(worker.hasError());
    EXPECT_TRUE(worker.hasNoWorkerCycleCalled(kWorkerIdleCheckTime));
}

TEST_P(StreamWorkerTest, WaitForAtLeastOneCycle) {
    ASSERT_TRUE(worker.start());
    const size_t workerCyclesBefore = worker.getWorkerCycles();
    EXPECT_TRUE(worker.waitForAtLeastOneCycle());
    EXPECT_GT(worker.getWorkerCycles(), workerCyclesBefore);
}

TEST_P(StreamWorkerTest, WaitForAtLeastOneCycleError) {
    ASSERT_TRUE(worker.start());
    stream.error = true;
    EXPECT_FALSE(worker.waitForAtLeastOneCycle());
}

INSTANTIATE_TEST_SUITE_P(StreamWorker, StreamWorkerTest, testing::Bool());