Donate to e Foundation | Murena handsets with /e/OS | Own a part of Murena! Learn more

Commit 231ca12c authored by Mikhail Naganov's avatar Mikhail Naganov Committed by Automerger Merge Worker
Browse files

audio: Fix the lifetime of the StreamWorker's logic part am: 0b9c5fee am: 4b279d6a

parents 8883713e 4b279d6a
Loading
Loading
Loading
Loading
+5 −2
Original line number Original line Diff line number Diff line
@@ -23,7 +23,7 @@ package {
    default_applicable_licenses: ["hardware_interfaces_license"],
    default_applicable_licenses: ["hardware_interfaces_license"],
}
}


cc_library_headers {
cc_library {
    name: "libaudioaidlcommon",
    name: "libaudioaidlcommon",
    host_supported: true,
    host_supported: true,
    vendor_available: true,
    vendor_available: true,
@@ -36,13 +36,16 @@ cc_library_headers {
        "libbase_headers",
        "libbase_headers",
        "libsystem_headers",
        "libsystem_headers",
    ],
    ],
    srcs: [
        "StreamWorker.cpp",
    ],
}
}


cc_test {
cc_test {
    name: "libaudioaidlcommon_test",
    name: "libaudioaidlcommon_test",
    host_supported: true,
    host_supported: true,
    vendor_available: true,
    vendor_available: true,
    header_libs: [
    static_libs: [
        "libaudioaidlcommon",
        "libaudioaidlcommon",
    ],
    ],
    shared_libs: [
    shared_libs: [
+160 −0
Original line number Original line Diff line number Diff line
/*
 * Copyright (C) 2022 The Android Open Source Project
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *      http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

#include <pthread.h>
#include <sched.h>
#include <sys/resource.h>

#include "include/StreamWorker.h"

namespace android::hardware::audio::common::internal {

bool ThreadController::start(const std::string& name, int priority) {
    mThreadName = name;
    mThreadPriority = priority;
    mWorker = std::thread(&ThreadController::workerThread, this);
    std::unique_lock<std::mutex> lock(mWorkerLock);
    android::base::ScopedLockAssertion lock_assertion(mWorkerLock);
    mWorkerCv.wait(lock, [&]() {
        android::base::ScopedLockAssertion lock_assertion(mWorkerLock);
        return mWorkerState == WorkerState::RUNNING || !mError.empty();
    });
    mWorkerStateChangeRequest = false;
    return mWorkerState == WorkerState::RUNNING;
}

void ThreadController::stop() {
    {
        std::lock_guard<std::mutex> lock(mWorkerLock);
        if (mWorkerState != WorkerState::STOPPED) {
            mWorkerState = WorkerState::STOPPED;
            mWorkerStateChangeRequest = true;
        }
    }
    if (mWorker.joinable()) {
        mWorker.join();
    }
}

bool ThreadController::waitForAtLeastOneCycle() {
    WorkerState newState;
    switchWorkerStateSync(WorkerState::RUNNING, WorkerState::PAUSE_REQUESTED, &newState);
    if (newState != WorkerState::PAUSED) return false;
    switchWorkerStateSync(newState, WorkerState::RESUME_REQUESTED, &newState);
    return newState == WorkerState::RUNNING;
}

void ThreadController::switchWorkerStateSync(WorkerState oldState, WorkerState newState,
                                             WorkerState* finalState) {
    std::unique_lock<std::mutex> lock(mWorkerLock);
    android::base::ScopedLockAssertion lock_assertion(mWorkerLock);
    if (mWorkerState != oldState) {
        if (finalState) *finalState = mWorkerState;
        return;
    }
    mWorkerState = newState;
    mWorkerStateChangeRequest = true;
    mWorkerCv.wait(lock, [&]() {
        android::base::ScopedLockAssertion lock_assertion(mWorkerLock);
        return mWorkerState != newState;
    });
    if (finalState) *finalState = mWorkerState;
}

void ThreadController::workerThread() {
    using Status = StreamLogic::Status;

    std::string error = mLogic->init();
    if (error.empty() && !mThreadName.empty()) {
        std::string compliantName(mThreadName.substr(0, 15));
        if (int errCode = pthread_setname_np(pthread_self(), compliantName.c_str()); errCode != 0) {
            error.append("Failed to set thread name: ").append(strerror(errCode));
        }
    }
    if (error.empty() && mThreadPriority != ANDROID_PRIORITY_DEFAULT) {
        if (int result = setpriority(PRIO_PROCESS, 0, mThreadPriority); result != 0) {
            int errCode = errno;
            error.append("Failed to set thread priority: ").append(strerror(errCode));
        }
    }
    {
        std::lock_guard<std::mutex> lock(mWorkerLock);
        mWorkerState = error.empty() ? WorkerState::RUNNING : WorkerState::STOPPED;
        mError = error;
    }
    mWorkerCv.notify_one();
    if (!error.empty()) return;

    for (WorkerState state = WorkerState::RUNNING; state != WorkerState::STOPPED;) {
        bool needToNotify = false;
        if (Status status = state != WorkerState::PAUSED ? mLogic->cycle()
                                                         : (sched_yield(), Status::CONTINUE);
            status == Status::CONTINUE) {
            {
                // See https://developer.android.com/training/articles/smp#nonracing
                android::base::ScopedLockAssertion lock_assertion(mWorkerLock);
                if (!mWorkerStateChangeRequest.load(std::memory_order_relaxed)) continue;
            }
            //
            // Pause and resume are synchronous. One worker cycle must complete
            // before the worker indicates a state change. This is how 'mWorkerState' and
            // 'state' interact:
            //
            // mWorkerState == RUNNING
            // client sets mWorkerState := PAUSE_REQUESTED
            // last workerCycle gets executed, state := mWorkerState := PAUSED by us
            //   (or the workers enters the 'error' state if workerCycle fails)
            // client gets notified about state change in any case
            // thread is doing a busy wait while 'state == PAUSED'
            // client sets mWorkerState := RESUME_REQUESTED
            // state := mWorkerState (RESUME_REQUESTED)
            // mWorkerState := RUNNING, but we don't notify the client yet
            // first workerCycle gets executed, the code below triggers a client notification
            //   (or if workerCycle fails, worker enters 'error' state and also notifies)
            // state := mWorkerState (RUNNING)
            std::lock_guard<std::mutex> lock(mWorkerLock);
            if (state == WorkerState::RESUME_REQUESTED) {
                needToNotify = true;
            }
            state = mWorkerState;
            if (mWorkerState == WorkerState::PAUSE_REQUESTED) {
                state = mWorkerState = WorkerState::PAUSED;
                needToNotify = true;
            } else if (mWorkerState == WorkerState::RESUME_REQUESTED) {
                mWorkerState = WorkerState::RUNNING;
            }
        } else {
            std::lock_guard<std::mutex> lock(mWorkerLock);
            if (state == WorkerState::RESUME_REQUESTED ||
                mWorkerState == WorkerState::PAUSE_REQUESTED) {
                needToNotify = true;
            }
            state = mWorkerState = WorkerState::STOPPED;
            if (status == Status::ABORT) {
                mError = "Received ABORT from the logic cycle";
            }
        }
        if (needToNotify) {
            {
                std::lock_guard<std::mutex> lock(mWorkerLock);
                mWorkerStateChangeRequest = false;
            }
            mWorkerCv.notify_one();
        }
    }
}

}  // namespace android::hardware::audio::common::internal
+83 −163
Original line number Original line Diff line number Diff line
@@ -16,10 +16,6 @@


#pragma once
#pragma once


#include <pthread.h>
#include <sched.h>
#include <sys/resource.h>

#include <atomic>
#include <atomic>
#include <condition_variable>
#include <condition_variable>
#include <mutex>
#include <mutex>
@@ -31,32 +27,18 @@


namespace android::hardware::audio::common {
namespace android::hardware::audio::common {


template <typename Impl>
class StreamLogic;
class StreamWorker {

namespace internal {

class ThreadController {
    enum class WorkerState { STOPPED, RUNNING, PAUSE_REQUESTED, PAUSED, RESUME_REQUESTED };
    enum class WorkerState { STOPPED, RUNNING, PAUSE_REQUESTED, PAUSED, RESUME_REQUESTED };


  public:
  public:
    enum class WorkerStatus { ABORT, CONTINUE, EXIT };
    explicit ThreadController(StreamLogic* logic) : mLogic(logic) {}
    ~ThreadController() { stop(); }


    StreamWorker() = default;
    bool start(const std::string& name, int priority);
    ~StreamWorker() { stop(); }
    // Note that 'priority' here is what is known as the 'nice number' in *nix systems.
    // The nice number is used with the default scheduler. For threads that
    // need to use a specialized scheduler (e.g. SCHED_FIFO) and set the priority within it,
    // it is recommended to implement an appropriate configuration sequence within `workerInit`.
    bool start(const std::string& name = "", int priority = ANDROID_PRIORITY_DEFAULT) {
        mThreadName = name;
        mThreadPriority = priority;
        mWorker = std::thread(&StreamWorker::workerThread, this);
        std::unique_lock<std::mutex> lock(mWorkerLock);
        android::base::ScopedLockAssertion lock_assertion(mWorkerLock);
        mWorkerCv.wait(lock, [&]() {
            android::base::ScopedLockAssertion lock_assertion(mWorkerLock);
            return mWorkerState == WorkerState::RUNNING || !mError.empty();
        });
        mWorkerStateChangeRequest = false;
        return mWorkerState == WorkerState::RUNNING;
    }
    void pause() { switchWorkerStateSync(WorkerState::RUNNING, WorkerState::PAUSE_REQUESTED); }
    void pause() { switchWorkerStateSync(WorkerState::RUNNING, WorkerState::PAUSE_REQUESTED); }
    void resume() { switchWorkerStateSync(WorkerState::PAUSED, WorkerState::RESUME_REQUESTED); }
    void resume() { switchWorkerStateSync(WorkerState::PAUSED, WorkerState::RESUME_REQUESTED); }
    bool hasError() {
    bool hasError() {
@@ -67,150 +49,21 @@ class StreamWorker {
        std::lock_guard<std::mutex> lock(mWorkerLock);
        std::lock_guard<std::mutex> lock(mWorkerLock);
        return mError;
        return mError;
    }
    }
    void stop() {
    void stop();
        {
    bool waitForAtLeastOneCycle();
            std::lock_guard<std::mutex> lock(mWorkerLock);

            if (mWorkerState != WorkerState::STOPPED) {
                mWorkerState = WorkerState::STOPPED;
                mWorkerStateChangeRequest = true;
            }
        }
        if (mWorker.joinable()) {
            mWorker.join();
        }
    }
    bool waitForAtLeastOneCycle() {
        WorkerState newState;
        switchWorkerStateSync(WorkerState::RUNNING, WorkerState::PAUSE_REQUESTED, &newState);
        if (newState != WorkerState::PAUSED) return false;
        switchWorkerStateSync(newState, WorkerState::RESUME_REQUESTED, &newState);
        return newState == WorkerState::RUNNING;
    }
    // Only used by unit tests.
    // Only used by unit tests.
    void testLockUnlockMutex(bool lock) NO_THREAD_SAFETY_ANALYSIS {
    void lockUnlockMutex(bool lock) NO_THREAD_SAFETY_ANALYSIS {
        lock ? mWorkerLock.lock() : mWorkerLock.unlock();
        lock ? mWorkerLock.lock() : mWorkerLock.unlock();
    }
    }
    std::thread::native_handle_type testGetThreadNativeHandle() { return mWorker.native_handle(); }
    std::thread::native_handle_type getThreadNativeHandle() { return mWorker.native_handle(); }

    // Methods that need to be provided by subclasses:
    //
    // /* Called once at the beginning of the thread loop. Must return
    //  * an empty string to enter the thread loop, otherwise the thread loop
    //  * exits and the worker switches into the 'error' state, setting
    //  * the error to the returned value.
    //  */
    // std::string workerInit();
    //
    // /* Called for each thread loop unless the thread is in 'paused' state.
    //  * Must return 'CONTINUE' to continue running, otherwise the thread loop
    //  * exits. If the result from worker cycle is 'ABORT' then the worker switches
    //  * into the 'error' state with a generic error message. It is recommended that
    //  * the subclass reports any problems via logging facilities. Returning the 'EXIT'
    //  * status is equivalent to calling 'stop()' method. This is just a way of
    //  * of stopping the worker by its own initiative.
    //  */
    // WorkerStatus workerCycle();


  private:
  private:
    void switchWorkerStateSync(WorkerState oldState, WorkerState newState,
    void switchWorkerStateSync(WorkerState oldState, WorkerState newState,
                               WorkerState* finalState = nullptr) {
                               WorkerState* finalState = nullptr);
        std::unique_lock<std::mutex> lock(mWorkerLock);
    void workerThread();
        android::base::ScopedLockAssertion lock_assertion(mWorkerLock);
        if (mWorkerState != oldState) {
            if (finalState) *finalState = mWorkerState;
            return;
        }
        mWorkerState = newState;
        mWorkerStateChangeRequest = true;
        mWorkerCv.wait(lock, [&]() {
            android::base::ScopedLockAssertion lock_assertion(mWorkerLock);
            return mWorkerState != newState;
        });
        if (finalState) *finalState = mWorkerState;
    }
    void workerThread() {
        std::string error = static_cast<Impl*>(this)->workerInit();
        if (error.empty() && !mThreadName.empty()) {
            std::string compliantName(mThreadName.substr(0, 15));
            if (int errCode = pthread_setname_np(pthread_self(), compliantName.c_str());
                errCode != 0) {
                error.append("Failed to set thread name: ").append(strerror(errCode));
            }
        }
        if (error.empty() && mThreadPriority != ANDROID_PRIORITY_DEFAULT) {
            if (int result = setpriority(PRIO_PROCESS, 0, mThreadPriority); result != 0) {
                int errCode = errno;
                error.append("Failed to set thread priority: ").append(strerror(errCode));
            }
        }
        {
            std::lock_guard<std::mutex> lock(mWorkerLock);
            mWorkerState = error.empty() ? WorkerState::RUNNING : WorkerState::STOPPED;
            mError = error;
        }
        mWorkerCv.notify_one();
        if (!error.empty()) return;

        for (WorkerState state = WorkerState::RUNNING; state != WorkerState::STOPPED;) {
            bool needToNotify = false;
            if (WorkerStatus status = state != WorkerState::PAUSED
                                              ? static_cast<Impl*>(this)->workerCycle()
                                              : (sched_yield(), WorkerStatus::CONTINUE);
                status == WorkerStatus::CONTINUE) {
                {
                    // See https://developer.android.com/training/articles/smp#nonracing
                    android::base::ScopedLockAssertion lock_assertion(mWorkerLock);
                    if (!mWorkerStateChangeRequest.load(std::memory_order_relaxed)) continue;
                }
                //
                // Pause and resume are synchronous. One worker cycle must complete
                // before the worker indicates a state change. This is how 'mWorkerState' and
                // 'state' interact:
                //
                // mWorkerState == RUNNING
                // client sets mWorkerState := PAUSE_REQUESTED
                // last workerCycle gets executed, state := mWorkerState := PAUSED by us
                //   (or the workers enters the 'error' state if workerCycle fails)
                // client gets notified about state change in any case
                // thread is doing a busy wait while 'state == PAUSED'
                // client sets mWorkerState := RESUME_REQUESTED
                // state := mWorkerState (RESUME_REQUESTED)
                // mWorkerState := RUNNING, but we don't notify the client yet
                // first workerCycle gets executed, the code below triggers a client notification
                //   (or if workerCycle fails, worker enters 'error' state and also notifies)
                // state := mWorkerState (RUNNING)
                std::lock_guard<std::mutex> lock(mWorkerLock);
                if (state == WorkerState::RESUME_REQUESTED) {
                    needToNotify = true;
                }
                state = mWorkerState;
                if (mWorkerState == WorkerState::PAUSE_REQUESTED) {
                    state = mWorkerState = WorkerState::PAUSED;
                    needToNotify = true;
                } else if (mWorkerState == WorkerState::RESUME_REQUESTED) {
                    mWorkerState = WorkerState::RUNNING;
                }
            } else {
                std::lock_guard<std::mutex> lock(mWorkerLock);
                if (state == WorkerState::RESUME_REQUESTED ||
                    mWorkerState == WorkerState::PAUSE_REQUESTED) {
                    needToNotify = true;
                }
                state = mWorkerState = WorkerState::STOPPED;
                if (status == WorkerStatus::ABORT) {
                    mError = "workerCycle aborted";
                }
            }
            if (needToNotify) {
                {
                    std::lock_guard<std::mutex> lock(mWorkerLock);
                    mWorkerStateChangeRequest = false;
                }
                mWorkerCv.notify_one();
            }
        }
    }


    StreamLogic* const mLogic;
    std::string mThreadName;
    std::string mThreadName;
    int mThreadPriority = ANDROID_PRIORITY_DEFAULT;
    int mThreadPriority = ANDROID_PRIORITY_DEFAULT;
    std::thread mWorker;
    std::thread mWorker;
@@ -230,4 +83,71 @@ class StreamWorker {
    std::atomic<bool> mWorkerStateChangeRequest GUARDED_BY(mWorkerLock) = false;
    std::atomic<bool> mWorkerStateChangeRequest GUARDED_BY(mWorkerLock) = false;
};
};


}  // namespace internal

class StreamLogic {
  public:
    friend class internal::ThreadController;

    virtual ~StreamLogic() = default;

  protected:
    enum class Status { ABORT, CONTINUE, EXIT };

    /* Called once at the beginning of the thread loop. Must return
     * an empty string to enter the thread loop, otherwise the thread loop
     * exits and the worker switches into the 'error' state, setting
     * the error to the returned value.
     */
    virtual std::string init() = 0;

    /* Called for each thread loop unless the thread is in 'paused' state.
     * Must return 'CONTINUE' to continue running, otherwise the thread loop
     * exits. If the result from worker cycle is 'ABORT' then the worker switches
     * into the 'error' state with a generic error message. It is recommended that
     * the subclass reports any problems via logging facilities. Returning the 'EXIT'
     * status is equivalent to calling 'stop()' method. This is just a way of
     * of stopping the worker by its own initiative.
     */
    virtual Status cycle() = 0;
};

template <class LogicImpl>
class StreamWorker : public LogicImpl {
  public:
    template <class... Args>
    explicit StreamWorker(Args&&... args) : LogicImpl(std::forward<Args>(args)...), mThread(this) {}

    // Methods of LogicImpl are available via inheritance.
    // Forwarded methods of ThreadController follow.

    // Note that 'priority' here is what is known as the 'nice number' in *nix systems.
    // The nice number is used with the default scheduler. For threads that
    // need to use a specialized scheduler (e.g. SCHED_FIFO) and set the priority within it,
    // it is recommended to implement an appropriate configuration sequence within
    // 'LogicImpl' or 'StreamLogic::init'.
    bool start(const std::string& name = "", int priority = ANDROID_PRIORITY_DEFAULT) {
        return mThread.start(name, priority);
    }
    void pause() { mThread.pause(); }
    void resume() { mThread.resume(); }
    bool hasError() { return mThread.hasError(); }
    std::string getError() { return mThread.getError(); }
    void stop() { return mThread.stop(); }
    bool waitForAtLeastOneCycle() { return mThread.waitForAtLeastOneCycle(); }

    // Only used by unit tests.
    void testLockUnlockMutex(bool lock) { mThread.lockUnlockMutex(lock); }
    std::thread::native_handle_type testGetThreadNativeHandle() {
        return mThread.getThreadNativeHandle();
    }

  private:
    // The ThreadController gets destroyed before LogicImpl.
    // After the controller has been destroyed, it is guaranteed that
    // the thread was joined, thus the 'cycle' method of LogicImpl
    // will not be called anymore, and it is safe to destroy LogicImpl.
    internal::ThreadController mThread;
};

}  // namespace android::hardware::audio::common
}  // namespace android::hardware::audio::common
+12 −7
Original line number Original line Diff line number Diff line
@@ -16,6 +16,7 @@


#include <pthread.h>
#include <pthread.h>
#include <sched.h>
#include <sched.h>
#include <sys/resource.h>
#include <unistd.h>
#include <unistd.h>


#include <atomic>
#include <atomic>
@@ -26,18 +27,19 @@
#define LOG_TAG "StreamWorker_Test"
#define LOG_TAG "StreamWorker_Test"
#include <log/log.h>
#include <log/log.h>


using android::hardware::audio::common::StreamLogic;
using android::hardware::audio::common::StreamWorker;
using android::hardware::audio::common::StreamWorker;


class TestWorker : public StreamWorker<TestWorker> {
class TestWorkerLogic : public StreamLogic {
  public:
  public:
    struct Stream {
    struct Stream {
        void setErrorStatus() { status = WorkerStatus::ABORT; }
        void setErrorStatus() { status = Status::ABORT; }
        void setStopStatus() { status = WorkerStatus::EXIT; }
        void setStopStatus() { status = Status::EXIT; }
        std::atomic<WorkerStatus> status = WorkerStatus::CONTINUE;
        std::atomic<Status> status = Status::CONTINUE;
    };
    };


    // Use nullptr to test error reporting from the worker thread.
    // Use nullptr to test error reporting from the worker thread.
    explicit TestWorker(Stream* stream) : mStream(stream) {}
    explicit TestWorkerLogic(Stream* stream) : mStream(stream) {}


    size_t getWorkerCycles() const { return mWorkerCycles; }
    size_t getWorkerCycles() const { return mWorkerCycles; }
    int getPriority() const { return mPriority; }
    int getPriority() const { return mPriority; }
@@ -48,8 +50,10 @@ class TestWorker : public StreamWorker<TestWorker> {
        return mWorkerCycles == cyclesBefore;
        return mWorkerCycles == cyclesBefore;
    }
    }


    std::string workerInit() { return mStream != nullptr ? "" : "Expected error"; }
  protected:
    WorkerStatus workerCycle() {
    // StreamLogic implementation
    std::string init() override { return mStream != nullptr ? "" : "Expected error"; }
    Status cycle() override {
        mPriority = getpriority(PRIO_PROCESS, 0);
        mPriority = getpriority(PRIO_PROCESS, 0);
        do {
        do {
            mWorkerCycles++;
            mWorkerCycles++;
@@ -62,6 +66,7 @@ class TestWorker : public StreamWorker<TestWorker> {
    std::atomic<size_t> mWorkerCycles = 0;
    std::atomic<size_t> mWorkerCycles = 0;
    std::atomic<int> mPriority = ANDROID_PRIORITY_DEFAULT;
    std::atomic<int> mPriority = ANDROID_PRIORITY_DEFAULT;
};
};
using TestWorker = StreamWorker<TestWorkerLogic>;


// The parameter specifies whether an extra call to 'stop' is made at the end.
// The parameter specifies whether an extra call to 'stop' is made at the end.
class StreamWorkerInvalidTest : public testing::TestWithParam<bool> {
class StreamWorkerInvalidTest : public testing::TestWithParam<bool> {
+1 −0
Original line number Original line Diff line number Diff line
@@ -11,6 +11,7 @@ cc_library_static {
    name: "libaudioserviceexampleimpl",
    name: "libaudioserviceexampleimpl",
    vendor: true,
    vendor: true,
    shared_libs: [
    shared_libs: [
        "libaudioaidlcommon",
        "libbase",
        "libbase",
        "libbinder_ndk",
        "libbinder_ndk",
        "libstagefright_foundation",
        "libstagefright_foundation",
Loading