Donate to e Foundation | Murena handsets with /e/OS | Own a part of Murena! Learn more

Commit 2aab766d authored by Mikhail Naganov's avatar Mikhail Naganov
Browse files

audio: Fix default remote submix HAL implementation and VTS

The implementation had duplicated code in 'transfer', which already
present in 'outWrite'.

Cleaned up delay calculations and logging.

Fixed the VTS to send 'prepareToClose' before attempting to join
the worker. Otherwise, the worker could be stuck on a blocking
operation due to inactivity of the other party and join would never
happen.

Bug: 302132812
Test: atest VtsHalAudioCoreTargetTest --test-filter="*AudioModuleRemoteSubmix*"
Change-Id: Id8455eb12d1d2999dc0bc7b64f0d70a61a177598
parent 3c8b6ce1
Loading
Loading
Loading
Loading
+1 −1
Original line number Original line Diff line number Diff line
@@ -46,7 +46,7 @@ class StreamRemoteSubmix : public StreamCommonImpl {
    ndk::ScopedAStatus prepareToClose() override;
    ndk::ScopedAStatus prepareToClose() override;


  private:
  private:
    size_t getPipeSizeInFrames();
    long getDelayInUsForFrameCount(size_t frameCount);
    size_t getStreamPipeSizeInFrames();
    size_t getStreamPipeSizeInFrames();
    ::android::status_t outWrite(void* buffer, size_t frameCount, size_t* actualFrameCount);
    ::android::status_t outWrite(void* buffer, size_t frameCount, size_t* actualFrameCount);
    ::android::status_t inRead(void* buffer, size_t frameCount, size_t* actualFrameCount);
    ::android::status_t inRead(void* buffer, size_t frameCount, size_t* actualFrameCount);
+30 −40
Original line number Original line Diff line number Diff line
@@ -17,8 +17,6 @@
#define LOG_TAG "AHAL_StreamRemoteSubmix"
#define LOG_TAG "AHAL_StreamRemoteSubmix"
#include <android-base/logging.h>
#include <android-base/logging.h>


#include <cmath>

#include "core-impl/StreamRemoteSubmix.h"
#include "core-impl/StreamRemoteSubmix.h"


using aidl::android::hardware::audio::common::SinkMetadata;
using aidl::android::hardware::audio::common::SinkMetadata;
@@ -158,27 +156,8 @@ void StreamRemoteSubmix::shutdown() {


::android::status_t StreamRemoteSubmix::transfer(void* buffer, size_t frameCount,
::android::status_t StreamRemoteSubmix::transfer(void* buffer, size_t frameCount,
                                                 size_t* actualFrameCount, int32_t* latencyMs) {
                                                 size_t* actualFrameCount, int32_t* latencyMs) {
    *latencyMs = (getStreamPipeSizeInFrames() * MILLIS_PER_SECOND) / mStreamConfig.sampleRate;
    *latencyMs = getDelayInUsForFrameCount(getStreamPipeSizeInFrames()) / 1000;
    LOG(VERBOSE) << __func__ << ": Latency " << *latencyMs << "ms";
    LOG(VERBOSE) << __func__ << ": Latency " << *latencyMs << "ms";

    sp<MonoPipe> sink = mCurrentRoute->getSink();
    if (sink != nullptr) {
        if (sink->isShutdown()) {
            sink.clear();
            LOG(VERBOSE) << __func__ << ": pipe shutdown, ignoring the transfer.";
            // the pipe has already been shutdown, this buffer will be lost but we must simulate
            // timing so we don't drain the output faster than realtime
            const size_t delayUs = static_cast<size_t>(
                    std::roundf(frameCount * MICROS_PER_SECOND / mStreamConfig.sampleRate));
            usleep(delayUs);

            *actualFrameCount = frameCount;
            return ::android::OK;
        }
    } else {
        LOG(ERROR) << __func__ << ": transfer without a pipe!";
        return ::android::UNEXPECTED_NULL;
    }
    mCurrentRoute->exitStandby(mIsInput);
    mCurrentRoute->exitStandby(mIsInput);
    return (mIsInput ? inRead(buffer, frameCount, actualFrameCount)
    return (mIsInput ? inRead(buffer, frameCount, actualFrameCount)
                     : outWrite(buffer, frameCount, actualFrameCount));
                     : outWrite(buffer, frameCount, actualFrameCount));
@@ -202,6 +181,10 @@ void StreamRemoteSubmix::shutdown() {
    return ::android::OK;
    return ::android::OK;
}
}


long StreamRemoteSubmix::getDelayInUsForFrameCount(size_t frameCount) {
    return frameCount * MICROS_PER_SECOND / mStreamConfig.sampleRate;
}

// Calculate the maximum size of the pipe buffer in frames for the specified stream.
// Calculate the maximum size of the pipe buffer in frames for the specified stream.
size_t StreamRemoteSubmix::getStreamPipeSizeInFrames() {
size_t StreamRemoteSubmix::getStreamPipeSizeInFrames() {
    auto pipeConfig = mCurrentRoute->mPipeConfig;
    auto pipeConfig = mCurrentRoute->mPipeConfig;
@@ -215,11 +198,11 @@ size_t StreamRemoteSubmix::getStreamPipeSizeInFrames() {
    if (sink != nullptr) {
    if (sink != nullptr) {
        if (sink->isShutdown()) {
        if (sink->isShutdown()) {
            sink.clear();
            sink.clear();
            LOG(VERBOSE) << __func__ << ": pipe shutdown, ignoring the write.";
            const auto delayUs = getDelayInUsForFrameCount(frameCount);
            LOG(DEBUG) << __func__ << ": pipe shutdown, ignoring the write, sleeping for "
                       << delayUs << " us";
            // the pipe has already been shutdown, this buffer will be lost but we must
            // the pipe has already been shutdown, this buffer will be lost but we must
            // simulate timing so we don't drain the output faster than realtime
            // simulate timing so we don't drain the output faster than realtime
            const size_t delayUs = static_cast<size_t>(
                    std::roundf(frameCount * MICROS_PER_SECOND / mStreamConfig.sampleRate));
            usleep(delayUs);
            usleep(delayUs);
            *actualFrameCount = frameCount;
            *actualFrameCount = frameCount;
            return ::android::OK;
            return ::android::OK;
@@ -229,16 +212,17 @@ size_t StreamRemoteSubmix::getStreamPipeSizeInFrames() {
        return ::android::UNKNOWN_ERROR;
        return ::android::UNKNOWN_ERROR;
    }
    }


    const size_t availableToWrite = sink->availableToWrite();
    const bool shouldBlockWrite = mCurrentRoute->shouldBlockWrite();
    size_t availableToWrite = sink->availableToWrite();
    // NOTE: sink has been checked above and sink and source life cycles are synchronized
    // NOTE: sink has been checked above and sink and source life cycles are synchronized
    sp<MonoPipeReader> source = mCurrentRoute->getSource();
    sp<MonoPipeReader> source = mCurrentRoute->getSource();
    // If the write to the sink should be blocked, flush enough frames from the pipe to make space
    // If the write to the sink should be blocked, flush enough frames from the pipe to make space
    // to write the most recent data.
    // to write the most recent data.
    if (!mCurrentRoute->shouldBlockWrite() && availableToWrite < frameCount) {
    if (!shouldBlockWrite && availableToWrite < frameCount) {
        static uint8_t flushBuffer[64];
        static uint8_t flushBuffer[64];
        const size_t flushBufferSizeFrames = sizeof(flushBuffer) / mStreamConfig.frameSize;
        const size_t flushBufferSizeFrames = sizeof(flushBuffer) / mStreamConfig.frameSize;
        size_t framesToFlushFromSource = frameCount - availableToWrite;
        size_t framesToFlushFromSource = frameCount - availableToWrite;
        LOG(VERBOSE) << __func__ << ": flushing " << framesToFlushFromSource
        LOG(DEBUG) << __func__ << ": flushing " << framesToFlushFromSource
                   << " frames from the pipe to avoid blocking";
                   << " frames from the pipe to avoid blocking";
        while (framesToFlushFromSource) {
        while (framesToFlushFromSource) {
            const size_t flushSize = std::min(framesToFlushFromSource, flushBufferSizeFrames);
            const size_t flushSize = std::min(framesToFlushFromSource, flushBufferSizeFrames);
@@ -247,7 +231,12 @@ size_t StreamRemoteSubmix::getStreamPipeSizeInFrames() {
            source->read(flushBuffer, flushSize);
            source->read(flushBuffer, flushSize);
        }
        }
    }
    }
    availableToWrite = sink->availableToWrite();


    if (!shouldBlockWrite && frameCount > availableToWrite) {
        // Truncate the request to avoid blocking.
        frameCount = availableToWrite;
    }
    ssize_t writtenFrames = sink->write(buffer, frameCount);
    ssize_t writtenFrames = sink->write(buffer, frameCount);
    if (writtenFrames < 0) {
    if (writtenFrames < 0) {
        if (writtenFrames == (ssize_t)::android::NEGOTIATE) {
        if (writtenFrames == (ssize_t)::android::NEGOTIATE) {
@@ -261,7 +250,6 @@ size_t StreamRemoteSubmix::getStreamPipeSizeInFrames() {
            writtenFrames = sink->write(buffer, frameCount);
            writtenFrames = sink->write(buffer, frameCount);
        }
        }
    }
    }
    sink.clear();


    if (writtenFrames < 0) {
    if (writtenFrames < 0) {
        LOG(ERROR) << __func__ << ": failed writing to pipe with " << writtenFrames;
        LOG(ERROR) << __func__ << ": failed writing to pipe with " << writtenFrames;
@@ -286,8 +274,9 @@ size_t StreamRemoteSubmix::getStreamPipeSizeInFrames() {
        } else {
        } else {
            LOG(ERROR) << __func__ << ": Read errors " << readErrorCount;
            LOG(ERROR) << __func__ << ": Read errors " << readErrorCount;
        }
        }
        const size_t delayUs = static_cast<size_t>(
        const auto delayUs = getDelayInUsForFrameCount(frameCount);
                std::roundf(frameCount * MICROS_PER_SECOND / mStreamConfig.sampleRate));
        LOG(DEBUG) << __func__ << ": no source, ignoring the read, sleeping for " << delayUs
                   << " us";
        usleep(delayUs);
        usleep(delayUs);
        memset(buffer, 0, mStreamConfig.frameSize * frameCount);
        memset(buffer, 0, mStreamConfig.frameSize * frameCount);
        *actualFrameCount = frameCount;
        *actualFrameCount = frameCount;
@@ -296,7 +285,7 @@ size_t StreamRemoteSubmix::getStreamPipeSizeInFrames() {


    // read the data from the pipe
    // read the data from the pipe
    int attempts = 0;
    int attempts = 0;
    const size_t delayUs = static_cast<size_t>(std::roundf(kReadAttemptSleepUs));
    const long delayUs = kReadAttemptSleepUs;
    char* buff = (char*)buffer;
    char* buff = (char*)buffer;
    size_t remainingFrames = frameCount;
    size_t remainingFrames = frameCount;
    int availableToRead = source->availableToRead();
    int availableToRead = source->availableToRead();
@@ -317,7 +306,8 @@ size_t StreamRemoteSubmix::getStreamPipeSizeInFrames() {
        } else {
        } else {
            attempts++;
            attempts++;
            LOG(WARNING) << __func__ << ": read returned " << framesRead
            LOG(WARNING) << __func__ << ": read returned " << framesRead
                         << " , read failure attempts = " << attempts;
                         << " , read failure attempts = " << attempts << ", sleeping for "
                         << delayUs << " us";
            usleep(delayUs);
            usleep(delayUs);
        }
        }
    }
    }
@@ -337,18 +327,18 @@ size_t StreamRemoteSubmix::getStreamPipeSizeInFrames() {
    // compute how much we need to sleep after reading the data by comparing the wall clock with
    // compute how much we need to sleep after reading the data by comparing the wall clock with
    //   the projected time at which we should return.
    //   the projected time at which we should return.
    // wall clock after reading from the pipe
    // wall clock after reading from the pipe
    auto recordDurationUs = std::chrono::steady_clock::now() - mCurrentRoute->getRecordStartTime();
    auto recordDurationUs = std::chrono::duration_cast<std::chrono::microseconds>(
            std::chrono::steady_clock::now() - mCurrentRoute->getRecordStartTime());


    // readCounterFrames contains the number of frames that have been read since the beginning of
    // readCounterFrames contains the number of frames that have been read since the beginning of
    // recording (including this call): it's converted to usec and compared to how long we've been
    // recording (including this call): it's converted to usec and compared to how long we've been
    // recording for, which gives us how long we must wait to sync the projected recording time, and
    // recording for, which gives us how long we must wait to sync the projected recording time, and
    // the observed recording time.
    // the observed recording time.
    const int projectedVsObservedOffsetUs =
    const long projectedVsObservedOffsetUs =
            std::roundf((readCounterFrames * MICROS_PER_SECOND / mStreamConfig.sampleRate) -
            getDelayInUsForFrameCount(readCounterFrames) - recordDurationUs.count();
                        recordDurationUs.count());


    LOG(VERBOSE) << __func__ << ": record duration " << recordDurationUs.count()
    LOG(VERBOSE) << __func__ << ": record duration " << recordDurationUs.count()
                 << " microseconds, will wait: " << projectedVsObservedOffsetUs << " microseconds";
                 << " us, will wait: " << projectedVsObservedOffsetUs << " us";
    if (projectedVsObservedOffsetUs > 0) {
    if (projectedVsObservedOffsetUs > 0) {
        usleep(projectedVsObservedOffsetUs);
        usleep(projectedVsObservedOffsetUs);
    }
    }
+5 −2
Original line number Original line Diff line number Diff line
@@ -14,16 +14,19 @@
 * limitations under the License.
 * limitations under the License.
 */
 */


#pragma once

#include <chrono>
#include <mutex>
#include <mutex>


#include <android-base/thread_annotations.h>
#include <audio_utils/clock.h>
#include <audio_utils/clock.h>


#include <media/nbaio/MonoPipe.h>
#include <media/nbaio/MonoPipe.h>
#include <media/nbaio/MonoPipeReader.h>
#include <media/nbaio/MonoPipeReader.h>


#include <aidl/android/media/audio/common/AudioChannelLayout.h>
#include <aidl/android/media/audio/common/AudioChannelLayout.h>

#include <aidl/android/media/audio/common/AudioFormatDescription.h>
#include "core-impl/Stream.h"


using aidl::android::media::audio::common::AudioChannelLayout;
using aidl::android::media::audio::common::AudioChannelLayout;
using aidl::android::media::audio::common::AudioFormatDescription;
using aidl::android::media::audio::common::AudioFormatDescription;
+46 −29
Original line number Original line Diff line number Diff line
@@ -1112,6 +1112,7 @@ class DefaultStreamCallback : public ::aidl::android::hardware::audio::core::BnS
template <typename T>
template <typename T>
struct IOTraits {
struct IOTraits {
    static constexpr bool is_input = std::is_same_v<T, IStreamIn>;
    static constexpr bool is_input = std::is_same_v<T, IStreamIn>;
    static constexpr const char* directionStr = is_input ? "input" : "output";
    using Worker = std::conditional_t<is_input, StreamReader, StreamWriter>;
    using Worker = std::conditional_t<is_input, StreamReader, StreamWriter>;
};
};


@@ -4289,22 +4290,41 @@ class WithRemoteSubmix {
        ASSERT_NO_FATAL_FAILURE(SetUpPortConnection(module, moduleConfig));
        ASSERT_NO_FATAL_FAILURE(SetUpPortConnection(module, moduleConfig));
        SetUp(module, moduleConfig, mConnectedPort->get());
        SetUp(module, moduleConfig, mConnectedPort->get());
    }
    }
    void sendBurstCommands() {

    void sendBurstCommandsStartWorker() {
        const StreamContext* context = mStream->getContext();
        const StreamContext* context = mStream->getContext();
        StreamLogicDefaultDriver driver(makeBurstCommands(true), context->getFrameSizeBytes());
        mWorkerDriver = std::make_unique<StreamLogicDefaultDriver>(makeBurstCommands(true),
        typename IOTraits<Stream>::Worker worker(*context, &driver, mStream->getEventReceiver());
                                                                   context->getFrameSizeBytes());
        mWorker = std::make_unique<typename IOTraits<Stream>::Worker>(*context, mWorkerDriver.get(),
                                                                      mStream->getEventReceiver());


        LOG(DEBUG) << __func__ << ": starting worker...";
        LOG(DEBUG) << __func__ << ": starting " << IOTraits<Stream>::directionStr << " worker...";
        ASSERT_TRUE(worker.start());
        ASSERT_TRUE(mWorker->start());
        LOG(DEBUG) << __func__ << ": joining worker...";
    }
        worker.join();

        EXPECT_FALSE(worker.hasError()) << worker.getError();
    void sendBurstCommandsJoinWorker() {
        EXPECT_EQ("", driver.getUnexpectedStateTransition());
        // Must call 'prepareToClose' before attempting to join because the stream may be
        // stuck due to absence of activity from the other side of the remote submix pipe.
        std::shared_ptr<IStreamCommon> common;
        ASSERT_IS_OK(mStream->get()->getStreamCommon(&common));
        ASSERT_IS_OK(common->prepareToClose());
        LOG(DEBUG) << __func__ << ": joining " << IOTraits<Stream>::directionStr << " worker...";
        mWorker->join();
        EXPECT_FALSE(mWorker->hasError()) << mWorker->getError();
        EXPECT_EQ("", mWorkerDriver->getUnexpectedStateTransition());
        if (IOTraits<Stream>::is_input) {
        if (IOTraits<Stream>::is_input) {
            EXPECT_TRUE(driver.hasObservablePositionIncrease());
            EXPECT_TRUE(mWorkerDriver->hasObservablePositionIncrease());
        }
        }
        EXPECT_FALSE(driver.hasRetrogradeObservablePosition());
        EXPECT_FALSE(mWorkerDriver->hasRetrogradeObservablePosition());
        mWorker.reset();
        mWorkerDriver.reset();
    }

    void sendBurstCommands() {
        ASSERT_NO_FATAL_FAILURE(sendBurstCommandsStartWorker());
        ASSERT_NO_FATAL_FAILURE(sendBurstCommandsJoinWorker());
    }
    }

    bool skipTest() const { return mSkipTest; }
    bool skipTest() const { return mSkipTest; }


  private:
  private:
@@ -4337,6 +4357,8 @@ class WithRemoteSubmix {
    std::unique_ptr<WithDevicePortConnectedState> mConnectedPort;
    std::unique_ptr<WithDevicePortConnectedState> mConnectedPort;
    std::unique_ptr<WithAudioPatch> mPatch;
    std::unique_ptr<WithAudioPatch> mPatch;
    std::unique_ptr<WithStream<Stream>> mStream;
    std::unique_ptr<WithStream<Stream>> mStream;
    std::unique_ptr<StreamLogicDefaultDriver> mWorkerDriver;
    std::unique_ptr<typename IOTraits<Stream>::Worker> mWorker;
};
};


class AudioModuleRemoteSubmix : public AudioCoreModule {
class AudioModuleRemoteSubmix : public AudioCoreModule {
@@ -4350,18 +4372,15 @@ class AudioModuleRemoteSubmix : public AudioCoreModule {
};
};


TEST_P(AudioModuleRemoteSubmix, OutputDoesNotBlockWhenNoInput) {
TEST_P(AudioModuleRemoteSubmix, OutputDoesNotBlockWhenNoInput) {
    // open output stream
    WithRemoteSubmix<IStreamOut> streamOut;
    WithRemoteSubmix<IStreamOut> streamOut;
    ASSERT_NO_FATAL_FAILURE(streamOut.SetUp(module.get(), moduleConfig.get()));
    ASSERT_NO_FATAL_FAILURE(streamOut.SetUp(module.get(), moduleConfig.get()));
    if (streamOut.skipTest()) {
    if (streamOut.skipTest()) {
        GTEST_SKIP() << "No mix port for attached devices";
        GTEST_SKIP() << "No mix port for attached devices";
    }
    }
    // write something to stream
    ASSERT_NO_FATAL_FAILURE(streamOut.sendBurstCommands());
    ASSERT_NO_FATAL_FAILURE(streamOut.sendBurstCommands());
}
}


TEST_P(AudioModuleRemoteSubmix, OutputDoesNotBlockWhenInputStuck) {
TEST_P(AudioModuleRemoteSubmix, OutputDoesNotBlockWhenInputStuck) {
    // open output stream
    WithRemoteSubmix<IStreamOut> streamOut;
    WithRemoteSubmix<IStreamOut> streamOut;
    ASSERT_NO_FATAL_FAILURE(streamOut.SetUp(module.get(), moduleConfig.get()));
    ASSERT_NO_FATAL_FAILURE(streamOut.SetUp(module.get(), moduleConfig.get()));
    if (streamOut.skipTest()) {
    if (streamOut.skipTest()) {
@@ -4370,19 +4389,16 @@ TEST_P(AudioModuleRemoteSubmix, OutputDoesNotBlockWhenInputStuck) {
    auto address = streamOut.getAudioDeviceAddress();
    auto address = streamOut.getAudioDeviceAddress();
    ASSERT_TRUE(address.has_value());
    ASSERT_TRUE(address.has_value());


    // open input stream
    WithRemoteSubmix<IStreamIn> streamIn(address.value());
    WithRemoteSubmix<IStreamIn> streamIn(address.value());
    ASSERT_NO_FATAL_FAILURE(streamIn.SetUp(module.get(), moduleConfig.get()));
    ASSERT_NO_FATAL_FAILURE(streamIn.SetUp(module.get(), moduleConfig.get()));
    if (streamIn.skipTest()) {
    if (streamIn.skipTest()) {
        GTEST_SKIP() << "No mix port for attached devices";
        GTEST_SKIP() << "No mix port for attached devices";
    }
    }


    // write something to stream
    ASSERT_NO_FATAL_FAILURE(streamOut.sendBurstCommands());
    ASSERT_NO_FATAL_FAILURE(streamOut.sendBurstCommands());
}
}


TEST_P(AudioModuleRemoteSubmix, OutputAndInput) {
TEST_P(AudioModuleRemoteSubmix, OutputAndInput) {
    // open output stream
    WithRemoteSubmix<IStreamOut> streamOut;
    WithRemoteSubmix<IStreamOut> streamOut;
    ASSERT_NO_FATAL_FAILURE(streamOut.SetUp(module.get(), moduleConfig.get()));
    ASSERT_NO_FATAL_FAILURE(streamOut.SetUp(module.get(), moduleConfig.get()));
    if (streamOut.skipTest()) {
    if (streamOut.skipTest()) {
@@ -4391,21 +4407,20 @@ TEST_P(AudioModuleRemoteSubmix, OutputAndInput) {
    auto address = streamOut.getAudioDeviceAddress();
    auto address = streamOut.getAudioDeviceAddress();
    ASSERT_TRUE(address.has_value());
    ASSERT_TRUE(address.has_value());


    // open input stream
    WithRemoteSubmix<IStreamIn> streamIn(address.value());
    WithRemoteSubmix<IStreamIn> streamIn(address.value());
    ASSERT_NO_FATAL_FAILURE(streamIn.SetUp(module.get(), moduleConfig.get()));
    ASSERT_NO_FATAL_FAILURE(streamIn.SetUp(module.get(), moduleConfig.get()));
    if (streamIn.skipTest()) {
    if (streamIn.skipTest()) {
        GTEST_SKIP() << "No mix port for attached devices";
        GTEST_SKIP() << "No mix port for attached devices";
    }
    }


    // write something to stream
    // Start writing into the output stream.
    ASSERT_NO_FATAL_FAILURE(streamOut.sendBurstCommands());
    ASSERT_NO_FATAL_FAILURE(streamOut.sendBurstCommandsStartWorker());
    // read from input stream
    // Simultaneously, read from the input stream.
    ASSERT_NO_FATAL_FAILURE(streamIn.sendBurstCommands());
    ASSERT_NO_FATAL_FAILURE(streamIn.sendBurstCommands());
    ASSERT_NO_FATAL_FAILURE(streamOut.sendBurstCommandsJoinWorker());
}
}


TEST_P(AudioModuleRemoteSubmix, OpenInputMultipleTimes) {
TEST_P(AudioModuleRemoteSubmix, OpenInputMultipleTimes) {
    // open output stream
    WithRemoteSubmix<IStreamOut> streamOut;
    WithRemoteSubmix<IStreamOut> streamOut;
    ASSERT_NO_FATAL_FAILURE(streamOut.SetUp(module.get(), moduleConfig.get()));
    ASSERT_NO_FATAL_FAILURE(streamOut.SetUp(module.get(), moduleConfig.get()));
    if (streamOut.skipTest()) {
    if (streamOut.skipTest()) {
@@ -4414,14 +4429,13 @@ TEST_P(AudioModuleRemoteSubmix, OpenInputMultipleTimes) {
    auto address = streamOut.getAudioDeviceAddress();
    auto address = streamOut.getAudioDeviceAddress();
    ASSERT_TRUE(address.has_value());
    ASSERT_TRUE(address.has_value());


    // connect remote submix input device port
    // Connect remote submix input device port.
    auto port = WithRemoteSubmix<IStreamIn>::getRemoteSubmixAudioPort(moduleConfig.get(),
    auto port = WithRemoteSubmix<IStreamIn>::getRemoteSubmixAudioPort(moduleConfig.get(),
                                                                      address.value());
                                                                      address.value());
    ASSERT_TRUE(port.has_value()) << "Device AudioPort for remote submix not found";
    ASSERT_TRUE(port.has_value()) << "Device AudioPort for remote submix not found";
    WithDevicePortConnectedState connectedInputPort(port.value());
    WithDevicePortConnectedState connectedInputPort(port.value());
    ASSERT_NO_FATAL_FAILURE(connectedInputPort.SetUp(module.get(), moduleConfig.get()));
    ASSERT_NO_FATAL_FAILURE(connectedInputPort.SetUp(module.get(), moduleConfig.get()));


    // open input streams
    const int streamInCount = 3;
    const int streamInCount = 3;
    std::vector<std::unique_ptr<WithRemoteSubmix<IStreamIn>>> streamIns(streamInCount);
    std::vector<std::unique_ptr<WithRemoteSubmix<IStreamIn>>> streamIns(streamInCount);
    for (int i = 0; i < streamInCount; i++) {
    for (int i = 0; i < streamInCount; i++) {
@@ -4432,13 +4446,16 @@ TEST_P(AudioModuleRemoteSubmix, OpenInputMultipleTimes) {
            GTEST_SKIP() << "No mix port for attached devices";
            GTEST_SKIP() << "No mix port for attached devices";
        }
        }
    }
    }
    // write something to output stream
    // Start writing into the output stream.
    ASSERT_NO_FATAL_FAILURE(streamOut.sendBurstCommands());
    ASSERT_NO_FATAL_FAILURE(streamOut.sendBurstCommandsStartWorker());

    // Simultaneously, read from input streams.
    // read from input streams
    for (int i = 0; i < streamInCount; i++) {
        ASSERT_NO_FATAL_FAILURE(streamIns[i]->sendBurstCommandsStartWorker());
    }
    for (int i = 0; i < streamInCount; i++) {
    for (int i = 0; i < streamInCount; i++) {
        ASSERT_NO_FATAL_FAILURE(streamIns[i]->sendBurstCommands());
        ASSERT_NO_FATAL_FAILURE(streamIns[i]->sendBurstCommandsJoinWorker());
    }
    }
    ASSERT_NO_FATAL_FAILURE(streamOut.sendBurstCommandsJoinWorker());
}
}


INSTANTIATE_TEST_SUITE_P(AudioModuleRemoteSubmixTest, AudioModuleRemoteSubmix,
INSTANTIATE_TEST_SUITE_P(AudioModuleRemoteSubmixTest, AudioModuleRemoteSubmix,