Donate to e Foundation | Murena handsets with /e/OS | Own a part of Murena! Learn more

Commit b49631f4 authored by Shunkai Yao's avatar Shunkai Yao
Browse files

Effect AIDL: Move EffectThread process into mutex

To avoid the case of receive thread stop right before process, in this
case test case AudioEffectTest#ConsumeDataAfterRestart will fail.

Bug: 264618800
Test: atest VtsHalAudioEffectTargetTest
Change-Id: I3c00361a537bc7010e6cd138f637f68b963e8033
parent 1d2c037e
Loading
Loading
Loading
Loading
+22 −44
Original line number Diff line number Diff line
@@ -70,26 +70,14 @@ RetCode EffectThread::destroyThread() {
}

RetCode EffectThread::startThread() {
    if (!mThread.joinable()) {
        LOG(ERROR) << __func__ << " thread already destroyed";
        return RetCode::ERROR_THREAD;
    }

    {
        std::lock_guard lg(mThreadMutex);
        if (!mStop) {
            LOG(WARNING) << __func__ << " already start";
            return RetCode::SUCCESS;
        }
        mStop = false;
    return handleStartStop(false /* stop */);
}

    mCv.notify_one();
    LOG(DEBUG) << __func__ << " done";
    return RetCode::SUCCESS;
RetCode EffectThread::stopThread() {
    return handleStartStop(true /* stop */);
}

RetCode EffectThread::stopThread() {
RetCode EffectThread::handleStartStop(bool stop) {
    if (!mThread.joinable()) {
        LOG(ERROR) << __func__ << " thread already destroyed";
        return RetCode::ERROR_THREAD;
@@ -97,13 +85,15 @@ RetCode EffectThread::stopThread() {

    {
        std::lock_guard lg(mThreadMutex);
        if (mStop) {
            LOG(WARNING) << __func__ << " already stop";
        if (stop == mStop) {
            LOG(WARNING) << __func__ << " already " << stop ? "stop" : "start";
            return RetCode::SUCCESS;
        }
        mStop = true;
        mStop = stop;
    }
    LOG(DEBUG) << __func__ << " done";

    mCv.notify_one();
    LOG(DEBUG) << stop ? "stop done" : "start done";
    return RetCode::SUCCESS;
}

@@ -111,34 +101,23 @@ void EffectThread::threadLoop() {
    pthread_setname_np(pthread_self(), mName.substr(0, kMaxTaskNameLen - 1).c_str());
    setpriority(PRIO_PROCESS, 0, mPriority);
    while (true) {
        bool needExit = false;
        {
        std::unique_lock l(mThreadMutex);
            mCv.wait(l, [&]() REQUIRES(mThreadMutex) {
                needExit = mExit;
                return mExit || !mStop;
            });
        }
        if (needExit) {
        ::android::base::ScopedLockAssertion lock_assertion(mThreadMutex);
        mCv.wait(l, [&]() REQUIRES(mThreadMutex) { return mExit || !mStop; });
        if (mExit) {
            LOG(WARNING) << __func__ << " EXIT!";
            return;
        }

        process();
        process_l();
    }
}

void EffectThread::process() {
    std::shared_ptr<EffectContext> context;
    {
        std::lock_guard lg(mThreadMutex);
        context = mThreadContext;
        RETURN_VALUE_IF(!context, void(), "nullContext");
    }
    std::shared_ptr<EffectContext::StatusMQ> statusMQ = context->getStatusFmq();
    std::shared_ptr<EffectContext::DataMQ> inputMQ = context->getInputDataFmq();
    std::shared_ptr<EffectContext::DataMQ> outputMQ = context->getOutputDataFmq();
    auto buffer = context->getWorkBuffer();
void EffectThread::process_l() {
    RETURN_VALUE_IF(!mThreadContext, void(), "nullContext");
    std::shared_ptr<EffectContext::StatusMQ> statusMQ = mThreadContext->getStatusFmq();
    std::shared_ptr<EffectContext::DataMQ> inputMQ = mThreadContext->getInputDataFmq();
    std::shared_ptr<EffectContext::DataMQ> outputMQ = mThreadContext->getOutputDataFmq();
    auto buffer = mThreadContext->getWorkBuffer();

    // Only this worker will read from input data MQ and write to output data MQ.
    auto readSamples = inputMQ->availableToRead(), writeSamples = outputMQ->availableToWrite();
@@ -149,7 +128,6 @@ void EffectThread::process() {

        inputMQ->read(buffer, processSamples);

        // call effectProcessImpl without lock
        IEffect::Status status = effectProcessImpl(buffer, buffer, processSamples);
        outputMQ->write(buffer, status.fmqProduced);
        statusMQ->writeBlocking(&status, 1);
+9 −9
Original line number Diff line number Diff line
@@ -54,6 +54,9 @@ class EffectThread {
     * EffectThread will make sure effectProcessImpl only be called after startThread() successful
     * and before stopThread() successful.
     *
     * effectProcessImpl implementation must not call any EffectThread interface, otherwise it will
     * cause deadlock.
     *
     * @param in address of input float buffer.
     * @param out address of output float buffer.
     * @param samples number of samples to process.
@@ -62,16 +65,11 @@ class EffectThread {
    virtual IEffect::Status effectProcessImpl(float* in, float* out, int samples) = 0;

    /**
     * The default EffectThread::process() implementation doesn't need to lock. It will only
     * access the FMQ and mWorkBuffer in  EffectContext, since they will only be changed in
     * EffectImpl IEffect::open() (in this case EffectThread just created and not running yet) and
     * IEffect::command(CommandId::RESET) (in this case EffectThread already stopped).
     *
     * process() call effectProcessImpl for effect processing, and because effectProcessImpl is
     * implemented by effects, process() must not hold lock before call into effectProcessImpl to
     * avoid deadlock.
     * process() call effectProcessImpl() for effect data processing, it is necessary for the
     * processing to be called under Effect thread mutex mThreadMutex, to avoid the effect state
     * change before/during data processing, and keep the thread and effect state consistent.
     */
    virtual void process();
    virtual void process_l() REQUIRES(mThreadMutex);

  private:
    const int kMaxTaskNameLen = 15;
@@ -83,5 +81,7 @@ class EffectThread {
    std::thread mThread;
    int mPriority;
    std::string mName;

    RetCode handleStartStop(bool stop);
};
}  // namespace aidl::android::hardware::audio::effect
+14 −8
Original line number Diff line number Diff line
@@ -148,18 +148,24 @@ class EffectHelper {
    }
    static void readFromFmq(std::unique_ptr<StatusMQ>& statusMq, size_t statusNum,
                            std::unique_ptr<DataMQ>& dataMq, size_t expectFloats,
                            std::vector<float>& buffer) {
                            std::vector<float>& buffer,
                            std::optional<int> expectStatus = STATUS_OK) {
        if (0 == statusNum) {
            ASSERT_EQ(0ul, statusMq->availableToRead());
            return;
        }
        IEffect::Status status{};
        ASSERT_TRUE(statusMq->readBlocking(&status, statusNum));
        ASSERT_EQ(STATUS_OK, status.status);
        if (statusNum != 0) {
        if (expectStatus.has_value()) {
            ASSERT_EQ(expectStatus.value(), status.status);
        }

        ASSERT_EQ(expectFloats, (unsigned)status.fmqProduced);
        ASSERT_EQ(expectFloats, dataMq->availableToRead());
        if (expectFloats != 0) {
            ASSERT_TRUE(dataMq->read(buffer.data(), expectFloats));
        }
    }
    }
    static Parameter::Common createParamCommon(
            int session = 0, int ioHandle = -1, int iSampleRate = 48000, int oSampleRate = 48000,
            long iFrameCount = 0x100, long oFrameCount = 0x100,
+49 −38
Original line number Diff line number Diff line
@@ -590,12 +590,14 @@ TEST_P(AudioEffectTest, ConsumeDataInProcessingState) {
    ASSERT_NO_FATAL_FAILURE(expectState(mEffect, State::PROCESSING));

    std::vector<float> buffer;
    EffectHelper::allocateInputData(common, inputMQ, buffer);
    EffectHelper::writeToFmq(inputMQ, buffer);
    EffectHelper::readFromFmq(statusMQ, 1, outputMQ, buffer.size(), buffer);
    EXPECT_NO_FATAL_FAILURE(EffectHelper::allocateInputData(common, inputMQ, buffer));
    EXPECT_NO_FATAL_FAILURE(EffectHelper::writeToFmq(inputMQ, buffer));
    EXPECT_NO_FATAL_FAILURE(
            EffectHelper::readFromFmq(statusMQ, 1, outputMQ, buffer.size(), buffer));

    ASSERT_NO_FATAL_FAILURE(command(mEffect, CommandId::STOP));
    ASSERT_NO_FATAL_FAILURE(expectState(mEffect, State::IDLE));
    EXPECT_NO_FATAL_FAILURE(EffectHelper::readFromFmq(statusMQ, 0, outputMQ, 0, buffer));

    ASSERT_NO_FATAL_FAILURE(close(mEffect));
    ASSERT_NO_FATAL_FAILURE(destroy(mFactory, mEffect));
@@ -617,20 +619,24 @@ TEST_P(AudioEffectTest, ConsumeDataAfterRestart) {
    auto outputMQ = std::make_unique<EffectHelper::DataMQ>(ret.outputDataMQ);
    ASSERT_TRUE(outputMQ->isValid());

    std::vector<float> buffer;
    ASSERT_NO_FATAL_FAILURE(command(mEffect, CommandId::START));
    ASSERT_NO_FATAL_FAILURE(expectState(mEffect, State::PROCESSING));
    ASSERT_NO_FATAL_FAILURE(command(mEffect, CommandId::STOP));
    ASSERT_NO_FATAL_FAILURE(expectState(mEffect, State::IDLE));
    EXPECT_NO_FATAL_FAILURE(
            EffectHelper::readFromFmq(statusMQ, 0, outputMQ, buffer.size(), buffer));
    ASSERT_NO_FATAL_FAILURE(command(mEffect, CommandId::START));
    ASSERT_NO_FATAL_FAILURE(expectState(mEffect, State::PROCESSING));

    std::vector<float> buffer;
    EffectHelper::allocateInputData(common, inputMQ, buffer);
    EffectHelper::writeToFmq(inputMQ, buffer);
    EffectHelper::readFromFmq(statusMQ, 1, outputMQ, buffer.size(), buffer);
    EXPECT_NO_FATAL_FAILURE(EffectHelper::allocateInputData(common, inputMQ, buffer));
    EXPECT_NO_FATAL_FAILURE(EffectHelper::writeToFmq(inputMQ, buffer));
    EXPECT_NO_FATAL_FAILURE(
            EffectHelper::readFromFmq(statusMQ, 1, outputMQ, buffer.size(), buffer));

    ASSERT_NO_FATAL_FAILURE(command(mEffect, CommandId::STOP));
    ASSERT_NO_FATAL_FAILURE(expectState(mEffect, State::IDLE));
    EXPECT_NO_FATAL_FAILURE(EffectHelper::readFromFmq(statusMQ, 0, outputMQ, 0, buffer));

    ASSERT_NO_FATAL_FAILURE(close(mEffect));
    ASSERT_NO_FATAL_FAILURE(destroy(mFactory, mEffect));
@@ -653,14 +659,14 @@ TEST_P(AudioEffectTest, SendDataAtIdleAndConsumeDataInProcessing) {
    ASSERT_TRUE(outputMQ->isValid());

    std::vector<float> buffer;
    EffectHelper::allocateInputData(common, inputMQ, buffer);
    EffectHelper::writeToFmq(inputMQ, buffer);
    EffectHelper::readFromFmq(statusMQ, 0, outputMQ, 0, buffer);
    EXPECT_NO_FATAL_FAILURE(EffectHelper::allocateInputData(common, inputMQ, buffer));
    EXPECT_NO_FATAL_FAILURE(EffectHelper::writeToFmq(inputMQ, buffer));

    ASSERT_NO_FATAL_FAILURE(command(mEffect, CommandId::START));
    ASSERT_NO_FATAL_FAILURE(expectState(mEffect, State::PROCESSING));

    EffectHelper::readFromFmq(statusMQ, 1, outputMQ, buffer.size(), buffer);
    EXPECT_NO_FATAL_FAILURE(
            EffectHelper::readFromFmq(statusMQ, 1, outputMQ, buffer.size(), buffer));

    ASSERT_NO_FATAL_FAILURE(command(mEffect, CommandId::STOP));
    ASSERT_NO_FATAL_FAILURE(expectState(mEffect, State::IDLE));
@@ -686,31 +692,30 @@ TEST_P(AudioEffectTest, ProcessDataMultipleTimes) {
    ASSERT_TRUE(outputMQ->isValid());

    std::vector<float> buffer;
    EffectHelper::allocateInputData(common, inputMQ, buffer);
    EffectHelper::writeToFmq(inputMQ, buffer);
    EffectHelper::readFromFmq(statusMQ, 0, outputMQ, 0, buffer);
    EXPECT_NO_FATAL_FAILURE(EffectHelper::allocateInputData(common, inputMQ, buffer));
    EXPECT_NO_FATAL_FAILURE(EffectHelper::writeToFmq(inputMQ, buffer));
    EXPECT_NO_FATAL_FAILURE(EffectHelper::readFromFmq(statusMQ, 0, outputMQ, 0, buffer));

    ASSERT_NO_FATAL_FAILURE(command(mEffect, CommandId::START));
    ASSERT_NO_FATAL_FAILURE(expectState(mEffect, State::PROCESSING));

    EffectHelper::readFromFmq(statusMQ, 1, outputMQ, buffer.size(), buffer);
    // expect no status and data after consume
    EffectHelper::readFromFmq(statusMQ, 0, outputMQ, 0, buffer);
    EXPECT_NO_FATAL_FAILURE(
            EffectHelper::readFromFmq(statusMQ, 1, outputMQ, buffer.size(), buffer));

    EffectHelper::writeToFmq(inputMQ, buffer);
    EffectHelper::readFromFmq(statusMQ, 1, outputMQ, buffer.size(), buffer);
    // expect no status and data after consume
    EffectHelper::readFromFmq(statusMQ, 0, outputMQ, 0, buffer);
    EXPECT_NO_FATAL_FAILURE(EffectHelper::writeToFmq(inputMQ, buffer));
    EXPECT_NO_FATAL_FAILURE(
            EffectHelper::readFromFmq(statusMQ, 1, outputMQ, buffer.size(), buffer));

    ASSERT_NO_FATAL_FAILURE(command(mEffect, CommandId::STOP));
    ASSERT_NO_FATAL_FAILURE(expectState(mEffect, State::IDLE));
    EXPECT_NO_FATAL_FAILURE(EffectHelper::readFromFmq(statusMQ, 0, outputMQ, 0, buffer));

    ASSERT_NO_FATAL_FAILURE(close(mEffect));
    ASSERT_NO_FATAL_FAILURE(destroy(mFactory, mEffect));
}

// Send data to IDLE state effects and expect it not be consumed.
TEST_P(AudioEffectTest, NotConsumeDataInIdleState) {
// Send data to processing state effects, stop, and restart.
TEST_P(AudioEffectTest, ConsumeDataAndRestart) {
    ASSERT_NO_FATAL_FAILURE(create(mFactory, mEffect, mDescriptor));

    Parameter::Common common = EffectHelper::createParamCommon(
@@ -727,17 +732,21 @@ TEST_P(AudioEffectTest, NotConsumeDataInIdleState) {

    ASSERT_NO_FATAL_FAILURE(command(mEffect, CommandId::START));
    ASSERT_NO_FATAL_FAILURE(expectState(mEffect, State::PROCESSING));
    std::vector<float> buffer;
    EXPECT_NO_FATAL_FAILURE(EffectHelper::allocateInputData(common, inputMQ, buffer));
    EXPECT_NO_FATAL_FAILURE(EffectHelper::writeToFmq(inputMQ, buffer));
    EXPECT_NO_FATAL_FAILURE(
            EffectHelper::readFromFmq(statusMQ, 1, outputMQ, buffer.size(), buffer));

    ASSERT_NO_FATAL_FAILURE(command(mEffect, CommandId::STOP));
    ASSERT_NO_FATAL_FAILURE(expectState(mEffect, State::IDLE));

    std::vector<float> buffer;
    EffectHelper::allocateInputData(common, inputMQ, buffer);
    EffectHelper::writeToFmq(inputMQ, buffer);
    EffectHelper::readFromFmq(statusMQ, 0, outputMQ, 0, buffer);
    EXPECT_NO_FATAL_FAILURE(EffectHelper::writeToFmq(inputMQ, buffer));
    EXPECT_NO_FATAL_FAILURE(EffectHelper::readFromFmq(statusMQ, 0, outputMQ, 0, buffer));

    ASSERT_NO_FATAL_FAILURE(command(mEffect, CommandId::START));
    ASSERT_NO_FATAL_FAILURE(expectState(mEffect, State::PROCESSING));
    EffectHelper::readFromFmq(statusMQ, 1, outputMQ, buffer.size(), buffer);
    EXPECT_NO_FATAL_FAILURE(
            EffectHelper::readFromFmq(statusMQ, 1, outputMQ, buffer.size(), buffer));

    ASSERT_NO_FATAL_FAILURE(command(mEffect, CommandId::STOP));
    ASSERT_NO_FATAL_FAILURE(expectState(mEffect, State::IDLE));
@@ -765,9 +774,9 @@ TEST_P(AudioEffectTest, NotConsumeDataByClosedEffect) {
    ASSERT_TRUE(outputMQ->isValid());

    std::vector<float> buffer;
    EffectHelper::allocateInputData(common, inputMQ, buffer);
    EffectHelper::writeToFmq(inputMQ, buffer);
    EffectHelper::readFromFmq(statusMQ, 0, outputMQ, 0, buffer);
    EXPECT_NO_FATAL_FAILURE(EffectHelper::allocateInputData(common, inputMQ, buffer));
    EXPECT_NO_FATAL_FAILURE(EffectHelper::writeToFmq(inputMQ, buffer));
    EXPECT_NO_FATAL_FAILURE(EffectHelper::readFromFmq(statusMQ, 0, outputMQ, 0, buffer));

    ASSERT_NO_FATAL_FAILURE(destroy(mFactory, mEffect));
}
@@ -800,9 +809,10 @@ TEST_P(AudioEffectTest, ConsumeDataMultipleEffects) {
    ASSERT_TRUE(outputMQ1->isValid());

    std::vector<float> buffer1, buffer2;
    EffectHelper::allocateInputData(common1, inputMQ1, buffer1);
    EffectHelper::writeToFmq(inputMQ1, buffer1);
    EffectHelper::readFromFmq(statusMQ1, 1, outputMQ1, buffer1.size(), buffer1);
    EXPECT_NO_FATAL_FAILURE(EffectHelper::allocateInputData(common1, inputMQ1, buffer1));
    EXPECT_NO_FATAL_FAILURE(EffectHelper::writeToFmq(inputMQ1, buffer1));
    EXPECT_NO_FATAL_FAILURE(
            EffectHelper::readFromFmq(statusMQ1, 1, outputMQ1, buffer1.size(), buffer1));

    auto statusMQ2 = std::make_unique<EffectHelper::StatusMQ>(ret2.statusMQ);
    ASSERT_TRUE(statusMQ2->isValid());
@@ -810,9 +820,10 @@ TEST_P(AudioEffectTest, ConsumeDataMultipleEffects) {
    ASSERT_TRUE(inputMQ2->isValid());
    auto outputMQ2 = std::make_unique<EffectHelper::DataMQ>(ret2.outputDataMQ);
    ASSERT_TRUE(outputMQ2->isValid());
    EffectHelper::allocateInputData(common2, inputMQ2, buffer2);
    EffectHelper::writeToFmq(inputMQ2, buffer2);
    EffectHelper::readFromFmq(statusMQ2, 1, outputMQ2, buffer2.size(), buffer2);
    EXPECT_NO_FATAL_FAILURE(EffectHelper::allocateInputData(common2, inputMQ2, buffer2));
    EXPECT_NO_FATAL_FAILURE(EffectHelper::writeToFmq(inputMQ2, buffer2));
    EXPECT_NO_FATAL_FAILURE(
            EffectHelper::readFromFmq(statusMQ2, 1, outputMQ2, buffer2.size(), buffer2));

    ASSERT_NO_FATAL_FAILURE(command(effect1, CommandId::STOP));
    ASSERT_NO_FATAL_FAILURE(expectState(effect1, State::IDLE));