Donate to e Foundation | Murena handsets with /e/OS | Own a part of Murena! Learn more

Commit fe4b88ab authored by Treehugger Robot's avatar Treehugger Robot Committed by Gerrit Code Review
Browse files

Merge "Add EventFlag for effect HAL thread processing"

parents d10869b2 41888a2f
Loading
Loading
Loading
Loading
+60 −49
Original line number Diff line number Diff line
@@ -14,13 +14,18 @@
 * limitations under the License.
 */

#include <cstddef>
#include <memory>

#define LOG_TAG "AHAL_EffectThread"
#include <android-base/logging.h>
#include <pthread.h>
#include <sys/resource.h>

#include "effect-impl/EffectThread.h"
#include "effect-impl/EffectTypes.h"

using ::android::hardware::EventFlag;

namespace aidl::android::hardware::audio::effect {

@@ -31,23 +36,35 @@ EffectThread::EffectThread() {
EffectThread::~EffectThread() {
    destroyThread();
    LOG(DEBUG) << __func__ << " done";
};
}

RetCode EffectThread::createThread(std::shared_ptr<EffectContext> context, const std::string& name,
                                   int priority, int sleepUs /* kSleepTimeUs */) {
                                   int priority) {
    if (mThread.joinable()) {
        LOG(WARNING) << "-" << mName << "-" << __func__ << " thread already created, no-op";
        LOG(WARNING) << mName << __func__ << " thread already created, no-op";
        return RetCode::SUCCESS;
    }
    mName = name;
    mPriority = priority;
    mSleepTimeUs = sleepUs;
    {
        std::lock_guard lg(mThreadMutex);
        mThreadContext = std::move(context);
        auto statusMQ = mThreadContext->getStatusFmq();
        EventFlag* efGroup = nullptr;
        ::android::status_t status =
                EventFlag::createEventFlag(statusMQ->getEventFlagWord(), &efGroup);
        if (status != ::android::OK || !efGroup) {
            LOG(ERROR) << mName << __func__ << " create EventFlagGroup failed " << status
                       << " efGroup " << efGroup;
            return RetCode::ERROR_THREAD;
        }
        mEfGroup.reset(efGroup);
        // kickoff and wait for commands (CommandId::START/STOP) or IEffect.close from client
        mEfGroup->wake(kEventFlagNotEmpty);
    }

    mThread = std::thread(&EffectThread::threadLoop, this);
    LOG(DEBUG) << "-" << mName << "-" << __func__ << " priority " << mPriority << " done";
    LOG(DEBUG) << mName << __func__ << " priority " << mPriority << " done";
    return RetCode::SUCCESS;
}

@@ -66,37 +83,31 @@ RetCode EffectThread::destroyThread() {
        std::lock_guard lg(mThreadMutex);
        mThreadContext.reset();
    }
    LOG(DEBUG) << "-" << mName << "-" << __func__ << " done";
    LOG(DEBUG) << mName << __func__;
    return RetCode::SUCCESS;
}

RetCode EffectThread::startThread() {
    return handleStartStop(false /* stop */);
}

RetCode EffectThread::stopThread() {
    return handleStartStop(true /* stop */);
    {
        std::lock_guard lg(mThreadMutex);
        mStop = false;
        mCv.notify_one();
    }

RetCode EffectThread::handleStartStop(bool stop) {
    if (!mThread.joinable()) {
        LOG(ERROR) << "-" << mName << "-" << __func__ << ": "
                   << " thread already destroyed";
        return RetCode::ERROR_THREAD;
    mEfGroup->wake(kEventFlagNotEmpty);
    LOG(DEBUG) << mName << __func__;
    return RetCode::SUCCESS;
}

RetCode EffectThread::stopThread() {
    {
        std::lock_guard lg(mThreadMutex);
        if (stop == mStop) {
            LOG(WARNING) << "-" << mName << "-" << __func__ << ": "
                         << " already " << (stop ? "stop" : "start");
            return RetCode::SUCCESS;
        }
        mStop = stop;
        mStop = true;
        mCv.notify_one();
    }

    mCv.notify_one();
    LOG(DEBUG) << ": " << mName << (stop ? " stop done" : " start done");
    mEfGroup->wake(kEventFlagNotEmpty);
    LOG(DEBUG) << mName << __func__;
    return RetCode::SUCCESS;
}

@@ -104,42 +115,42 @@ void EffectThread::threadLoop() {
    pthread_setname_np(pthread_self(), mName.substr(0, kMaxTaskNameLen - 1).c_str());
    setpriority(PRIO_PROCESS, 0, mPriority);
    while (true) {
        /**
         * wait for the EventFlag without lock, it's ok because the mEfGroup pointer will not change
         * in the life cycle of workerThread (threadLoop).
         */
        uint32_t efState = 0;
        mEfGroup->wait(kEventFlagNotEmpty, &efState);

        {
            std::unique_lock l(mThreadMutex);
            ::android::base::ScopedLockAssertion lock_assertion(mThreadMutex);
            mCv.wait(l, [&]() REQUIRES(mThreadMutex) { return mExit || !mStop; });
            if (mExit) {
            LOG(WARNING) << __func__ << " EXIT!";
                LOG(INFO) << __func__ << " EXIT!";
                return;
            }
            process_l();
        }
    }
}

void EffectThread::process_l() {
    RETURN_VALUE_IF(!mThreadContext, void(), "nullContext");
    std::shared_ptr<EffectContext::StatusMQ> statusMQ = mThreadContext->getStatusFmq();
    std::shared_ptr<EffectContext::DataMQ> inputMQ = mThreadContext->getInputDataFmq();
    std::shared_ptr<EffectContext::DataMQ> outputMQ = mThreadContext->getOutputDataFmq();
    auto buffer = mThreadContext->getWorkBuffer();

    // Only this worker will read from input data MQ and write to output data MQ.
    auto readSamples = inputMQ->availableToRead(), writeSamples = outputMQ->availableToWrite();
    if (readSamples && writeSamples) {
        auto processSamples = std::min(readSamples, writeSamples);
        LOG(DEBUG) << "-" << mName << "-" << __func__ << ": "
                   << " available to read " << readSamples << " available to write " << writeSamples
                   << " process " << processSamples;
    auto statusMQ = mThreadContext->getStatusFmq();
    auto inputMQ = mThreadContext->getInputDataFmq();
    auto outputMQ = mThreadContext->getOutputDataFmq();
    auto buffer = mThreadContext->getWorkBuffer();

    auto processSamples = inputMQ->availableToRead();
    if (processSamples) {
        inputMQ->read(buffer, processSamples);

        IEffect::Status status = effectProcessImpl(buffer, buffer, processSamples);
        outputMQ->write(buffer, status.fmqProduced);
        statusMQ->writeBlocking(&status, 1);
        LOG(DEBUG) << "-" << mName << "-" << __func__ << ": "
                   << " done processing, effect consumed " << status.fmqConsumed << " produced "
                   << status.fmqProduced;
    } else {
        usleep(mSleepTimeUs);
        LOG(DEBUG) << mName << __func__ << ": done processing, effect consumed "
                   << status.fmqConsumed << " produced " << status.fmqProduced;
    }
}

+2 −1
Original line number Diff line number Diff line
@@ -54,6 +54,7 @@ class EffectContext {
        size_t inBufferSizeInFloat = input.frameCount * mInputFrameSize / sizeof(float);
        size_t outBufferSizeInFloat = output.frameCount * mOutputFrameSize / sizeof(float);

        // only status FMQ use the EventFlag
        mStatusMQ = std::make_shared<StatusMQ>(statusDepth, true /*configureEventFlagWord*/);
        mInputMQ = std::make_shared<DataMQ>(inBufferSizeInFloat);
        mOutputMQ = std::make_shared<DataMQ>(outBufferSizeInFloat);
@@ -127,7 +128,7 @@ class EffectContext {
        return RetCode::SUCCESS;
    }
    virtual Parameter::Common getCommon() {
        LOG(INFO) << __func__ << mCommon.toString();
        LOG(DEBUG) << __func__ << mCommon.toString();
        return mCommon;
    }

+14 −6
Original line number Diff line number Diff line
@@ -16,10 +16,12 @@

#pragma once
#include <atomic>
#include <memory>
#include <string>
#include <thread>

#include <android-base/thread_annotations.h>
#include <fmq/EventFlag.h>
#include <system/thread_defs.h>

#include "effect-impl/EffectContext.h"
@@ -35,7 +37,7 @@ class EffectThread {

    // called by effect implementation.
    RetCode createThread(std::shared_ptr<EffectContext> context, const std::string& name,
                         int priority = ANDROID_PRIORITY_URGENT_AUDIO, int sleepUs = kSleepTimeUs);
                         int priority = ANDROID_PRIORITY_URGENT_AUDIO);
    RetCode destroyThread();
    RetCode startThread();
    RetCode stopThread();
@@ -73,17 +75,23 @@ class EffectThread {

  private:
    static constexpr int kMaxTaskNameLen = 15;
    static constexpr int kSleepTimeUs = 2000;  // in micro-second

    std::mutex mThreadMutex;
    std::condition_variable mCv;
    bool mExit GUARDED_BY(mThreadMutex) = false;
    bool mStop GUARDED_BY(mThreadMutex) = true;
    bool mExit GUARDED_BY(mThreadMutex) = false;
    std::shared_ptr<EffectContext> mThreadContext GUARDED_BY(mThreadMutex);

    struct EventFlagDeleter {
        void operator()(::android::hardware::EventFlag* flag) const {
            if (flag) {
                ::android::hardware::EventFlag::deleteEventFlag(&flag);
            }
        }
    };
    std::unique_ptr<::android::hardware::EventFlag, EventFlagDeleter> mEfGroup;
    std::thread mThread;
    int mPriority;
    int mSleepTimeUs = kSleepTimeUs;  // sleep time in micro-second
    std::string mName;

    RetCode handleStartStop(bool stop);
};
}  // namespace aidl::android::hardware::audio::effect
+13 −3
Original line number Diff line number Diff line
@@ -41,6 +41,7 @@ using namespace android;
using aidl::android::hardware::audio::effect::CommandId;
using aidl::android::hardware::audio::effect::Descriptor;
using aidl::android::hardware::audio::effect::IEffect;
using aidl::android::hardware::audio::effect::kEventFlagNotEmpty;
using aidl::android::hardware::audio::effect::Parameter;
using aidl::android::hardware::audio::effect::Range;
using aidl::android::hardware::audio::effect::State;
@@ -50,6 +51,7 @@ using aidl::android::media::audio::common::AudioFormatDescription;
using aidl::android::media::audio::common::AudioFormatType;
using aidl::android::media::audio::common::AudioUuid;
using aidl::android::media::audio::common::PcmType;
using ::android::hardware::EventFlag;

const AudioFormatDescription kDefaultFormatDescription = {
        .type = AudioFormatType::PCM, .pcm = PcmType::FLOAT_32_BIT, .encoding = ""};
@@ -145,12 +147,20 @@ class EffectHelper {
        buffer.resize(floatsToWrite);
        std::fill(buffer.begin(), buffer.end(), 0x5a);
    }
    static void writeToFmq(std::unique_ptr<DataMQ>& mq, const std::vector<float>& buffer) {
        const size_t available = mq->availableToWrite();
    static void writeToFmq(std::unique_ptr<StatusMQ>& statusMq, std::unique_ptr<DataMQ>& dataMq,
                           const std::vector<float>& buffer) {
        const size_t available = dataMq->availableToWrite();
        ASSERT_NE(0Ul, available);
        auto bufferFloats = buffer.size();
        auto floatsToWrite = std::min(available, bufferFloats);
        ASSERT_TRUE(mq->write(buffer.data(), floatsToWrite));
        ASSERT_TRUE(dataMq->write(buffer.data(), floatsToWrite));

        EventFlag* efGroup;
        ASSERT_EQ(::android::OK,
                  EventFlag::createEventFlag(statusMq->getEventFlagWord(), &efGroup));
        ASSERT_NE(nullptr, efGroup);
        efGroup->wake(kEventFlagNotEmpty);
        ASSERT_EQ(::android::OK, EventFlag::deleteEventFlag(&efGroup));
    }
    static void readFromFmq(std::unique_ptr<StatusMQ>& statusMq, size_t statusNum,
                            std::unique_ptr<DataMQ>& dataMq, size_t expectFloats,
+10 −10
Original line number Diff line number Diff line
@@ -597,7 +597,7 @@ TEST_P(AudioEffectTest, ConsumeDataInProcessingState) {

    std::vector<float> buffer;
    EXPECT_NO_FATAL_FAILURE(EffectHelper::allocateInputData(common, inputMQ, buffer));
    EXPECT_NO_FATAL_FAILURE(EffectHelper::writeToFmq(inputMQ, buffer));
    EXPECT_NO_FATAL_FAILURE(EffectHelper::writeToFmq(statusMQ, inputMQ, buffer));
    EXPECT_NO_FATAL_FAILURE(
            EffectHelper::readFromFmq(statusMQ, 1, outputMQ, buffer.size(), buffer));

@@ -636,7 +636,7 @@ TEST_P(AudioEffectTest, ConsumeDataAfterRestart) {
    ASSERT_NO_FATAL_FAILURE(expectState(mEffect, State::PROCESSING));

    EXPECT_NO_FATAL_FAILURE(EffectHelper::allocateInputData(common, inputMQ, buffer));
    EXPECT_NO_FATAL_FAILURE(EffectHelper::writeToFmq(inputMQ, buffer));
    EXPECT_NO_FATAL_FAILURE(EffectHelper::writeToFmq(statusMQ, inputMQ, buffer));
    EXPECT_NO_FATAL_FAILURE(
            EffectHelper::readFromFmq(statusMQ, 1, outputMQ, buffer.size(), buffer));

@@ -666,7 +666,7 @@ TEST_P(AudioEffectTest, SendDataAtIdleAndConsumeDataInProcessing) {

    std::vector<float> buffer;
    EXPECT_NO_FATAL_FAILURE(EffectHelper::allocateInputData(common, inputMQ, buffer));
    EXPECT_NO_FATAL_FAILURE(EffectHelper::writeToFmq(inputMQ, buffer));
    EXPECT_NO_FATAL_FAILURE(EffectHelper::writeToFmq(statusMQ, inputMQ, buffer));

    ASSERT_NO_FATAL_FAILURE(command(mEffect, CommandId::START));
    ASSERT_NO_FATAL_FAILURE(expectState(mEffect, State::PROCESSING));
@@ -699,7 +699,7 @@ TEST_P(AudioEffectTest, ProcessDataMultipleTimes) {

    std::vector<float> buffer;
    EXPECT_NO_FATAL_FAILURE(EffectHelper::allocateInputData(common, inputMQ, buffer));
    EXPECT_NO_FATAL_FAILURE(EffectHelper::writeToFmq(inputMQ, buffer));
    EXPECT_NO_FATAL_FAILURE(EffectHelper::writeToFmq(statusMQ, inputMQ, buffer));
    EXPECT_NO_FATAL_FAILURE(EffectHelper::readFromFmq(statusMQ, 0, outputMQ, 0, buffer));

    ASSERT_NO_FATAL_FAILURE(command(mEffect, CommandId::START));
@@ -708,7 +708,7 @@ TEST_P(AudioEffectTest, ProcessDataMultipleTimes) {
    EXPECT_NO_FATAL_FAILURE(
            EffectHelper::readFromFmq(statusMQ, 1, outputMQ, buffer.size(), buffer));

    EXPECT_NO_FATAL_FAILURE(EffectHelper::writeToFmq(inputMQ, buffer));
    EXPECT_NO_FATAL_FAILURE(EffectHelper::writeToFmq(statusMQ, inputMQ, buffer));
    EXPECT_NO_FATAL_FAILURE(
            EffectHelper::readFromFmq(statusMQ, 1, outputMQ, buffer.size(), buffer));

@@ -740,13 +740,13 @@ TEST_P(AudioEffectTest, ConsumeDataAndRestart) {
    ASSERT_NO_FATAL_FAILURE(expectState(mEffect, State::PROCESSING));
    std::vector<float> buffer;
    EXPECT_NO_FATAL_FAILURE(EffectHelper::allocateInputData(common, inputMQ, buffer));
    EXPECT_NO_FATAL_FAILURE(EffectHelper::writeToFmq(inputMQ, buffer));
    EXPECT_NO_FATAL_FAILURE(EffectHelper::writeToFmq(statusMQ, inputMQ, buffer));
    EXPECT_NO_FATAL_FAILURE(
            EffectHelper::readFromFmq(statusMQ, 1, outputMQ, buffer.size(), buffer));

    ASSERT_NO_FATAL_FAILURE(command(mEffect, CommandId::STOP));
    ASSERT_NO_FATAL_FAILURE(expectState(mEffect, State::IDLE));
    EXPECT_NO_FATAL_FAILURE(EffectHelper::writeToFmq(inputMQ, buffer));
    EXPECT_NO_FATAL_FAILURE(EffectHelper::writeToFmq(statusMQ, inputMQ, buffer));
    EXPECT_NO_FATAL_FAILURE(EffectHelper::readFromFmq(statusMQ, 0, outputMQ, 0, buffer));

    ASSERT_NO_FATAL_FAILURE(command(mEffect, CommandId::START));
@@ -781,7 +781,7 @@ TEST_P(AudioEffectTest, NotConsumeDataByClosedEffect) {

    std::vector<float> buffer;
    EXPECT_NO_FATAL_FAILURE(EffectHelper::allocateInputData(common, inputMQ, buffer));
    EXPECT_NO_FATAL_FAILURE(EffectHelper::writeToFmq(inputMQ, buffer));
    EXPECT_NO_FATAL_FAILURE(EffectHelper::writeToFmq(statusMQ, inputMQ, buffer));
    EXPECT_NO_FATAL_FAILURE(EffectHelper::readFromFmq(statusMQ, 0, outputMQ, 0, buffer));

    ASSERT_NO_FATAL_FAILURE(destroy(mFactory, mEffect));
@@ -816,7 +816,7 @@ TEST_P(AudioEffectTest, ConsumeDataMultipleEffects) {

    std::vector<float> buffer1, buffer2;
    EXPECT_NO_FATAL_FAILURE(EffectHelper::allocateInputData(common1, inputMQ1, buffer1));
    EXPECT_NO_FATAL_FAILURE(EffectHelper::writeToFmq(inputMQ1, buffer1));
    EXPECT_NO_FATAL_FAILURE(EffectHelper::writeToFmq(statusMQ1, inputMQ1, buffer1));
    EXPECT_NO_FATAL_FAILURE(
            EffectHelper::readFromFmq(statusMQ1, 1, outputMQ1, buffer1.size(), buffer1));

@@ -827,7 +827,7 @@ TEST_P(AudioEffectTest, ConsumeDataMultipleEffects) {
    auto outputMQ2 = std::make_unique<EffectHelper::DataMQ>(ret2.outputDataMQ);
    ASSERT_TRUE(outputMQ2->isValid());
    EXPECT_NO_FATAL_FAILURE(EffectHelper::allocateInputData(common2, inputMQ2, buffer2));
    EXPECT_NO_FATAL_FAILURE(EffectHelper::writeToFmq(inputMQ2, buffer2));
    EXPECT_NO_FATAL_FAILURE(EffectHelper::writeToFmq(statusMQ2, inputMQ2, buffer2));
    EXPECT_NO_FATAL_FAILURE(
            EffectHelper::readFromFmq(statusMQ2, 1, outputMQ2, buffer2.size(), buffer2));