Donate to e Foundation | Murena handsets with /e/OS | Own a part of Murena! Learn more

Commit 06085453 authored by Mikhail Naganov's avatar Mikhail Naganov
Browse files

audio: Fix remote submix module I/O timing and atomicity

Similar to the primary module implementation, align the time
spent in the transfer operation with the duration of audio.

Change the read operation to ingest as much data as possible
during the audio burst duration.

Ensure that checking the existence of a SubmixRoute and adding
a new one is an atomic operation.

Minor improvements to avoid extra synchronization.

In the configuration, change the limit of max open streams to 10
to match the legacy implementation.

Bug: 302132812
Test: atest CtsMediaAudioTestCases --test-filter=".*AudioPlaybackCaptureTest#testPlaybackCaptureDoS"
Change-Id: Iccb6aaac46c039551c3d5f7760b1459168d9cfe5
parent 4181e7db
Loading
Loading
Loading
Loading
+4 −4
Original line number Diff line number Diff line
@@ -320,9 +320,9 @@ std::unique_ptr<Configuration> getPrimaryConfiguration() {
//    - no profiles specified
//
// Mix ports:
//  * "r_submix output", maximum 20 opened streams, maximum 10 active streams
//  * "r_submix output", maximum 10 opened streams, maximum 10 active streams
//    - profile PCM 16-bit; MONO, STEREO; 8000, 11025, 16000, 32000, 44100, 48000
//  * "r_submix input", maximum 20 opened streams, maximum 10 active streams
//  * "r_submix input", maximum 10 opened streams, maximum 10 active streams
//    - profile PCM 16-bit; MONO, STEREO; 8000, 11025, 16000, 32000, 44100, 48000
//
// Routes:
@@ -355,12 +355,12 @@ std::unique_ptr<Configuration> getRSubmixConfiguration() {
        // Mix ports

        AudioPort rsubmixOutMix =
                createPort(c.nextPortId++, "r_submix output", 0, false, createPortMixExt(20, 10));
                createPort(c.nextPortId++, "r_submix output", 0, false, createPortMixExt(10, 10));
        rsubmixOutMix.profiles = remoteSubmixPcmAudioProfiles;
        c.ports.push_back(rsubmixOutMix);

        AudioPort rsubmixInMix =
                createPort(c.nextPortId++, "r_submix input", 0, true, createPortMixExt(20, 10));
                createPort(c.nextPortId++, "r_submix input", 0, true, createPortMixExt(10, 10));
        rsubmixInMix.profiles = remoteSubmixPcmAudioProfiles;
        c.ports.push_back(rsubmixInMix);

+4 −0
Original line number Diff line number Diff line
@@ -71,6 +71,10 @@ class StreamRemoteSubmix : public StreamCommonImpl {
    static constexpr int kMaxReadFailureAttempts = 3;
    // 5ms between two read attempts when pipe is empty
    static constexpr int kReadAttemptSleepUs = 5000;

    long mStartTimeNs = 0;
    long mFramesSinceStart = 0;
    int mReadErrorCount = 0;
};

class StreamInRemoteSubmix final : public StreamIn, public StreamSwitcher {
+76 −95
Original line number Diff line number Diff line
@@ -16,6 +16,9 @@

#define LOG_TAG "AHAL_StreamRemoteSubmix"
#include <android-base/logging.h>
#include <audio_utils/clock.h>
#include <error/Result.h>
#include <error/expected_utils.h>

#include "core-impl/StreamRemoteSubmix.h"

@@ -50,7 +53,6 @@ std::map<AudioDeviceAddress, std::shared_ptr<SubmixRoute>> StreamRemoteSubmix::s
        if (routeItr != sSubmixRoutes.end()) {
            mCurrentRoute = routeItr->second;
        }
    }
        // If route is not available for this port, add it.
        if (mCurrentRoute == nullptr) {
            // Initialize the pipe.
@@ -59,11 +61,9 @@ std::map<AudioDeviceAddress, std::shared_ptr<SubmixRoute>> StreamRemoteSubmix::s
                LOG(ERROR) << __func__ << ": create pipe failed";
                return ::android::NO_INIT;
            }
        {
            std::lock_guard guard(sSubmixRoutesLock);
            sSubmixRoutes.emplace(mDeviceAddress, mCurrentRoute);
        }
    } else {
    }
    if (!mCurrentRoute->isStreamConfigValid(mIsInput, mStreamConfig)) {
        LOG(ERROR) << __func__ << ": invalid stream config";
        return ::android::NO_INIT;
@@ -82,7 +82,6 @@ std::map<AudioDeviceAddress, std::shared_ptr<SubmixRoute>> StreamRemoteSubmix::s
            return ::android::NO_INIT;
        }
    }
    }

    mCurrentRoute->openStream(mIsInput);
    return ::android::OK;
@@ -110,6 +109,8 @@ std::map<AudioDeviceAddress, std::shared_ptr<SubmixRoute>> StreamRemoteSubmix::s

::android::status_t StreamRemoteSubmix::start() {
    mCurrentRoute->exitStandby(mIsInput);
    mStartTimeNs = ::android::uptimeNanos();
    mFramesSinceStart = 0;
    return ::android::OK;
}

@@ -161,8 +162,21 @@ void StreamRemoteSubmix::shutdown() {
    *latencyMs = getDelayInUsForFrameCount(getStreamPipeSizeInFrames()) / 1000;
    LOG(VERBOSE) << __func__ << ": Latency " << *latencyMs << "ms";
    mCurrentRoute->exitStandby(mIsInput);
    return (mIsInput ? inRead(buffer, frameCount, actualFrameCount)
    RETURN_STATUS_IF_ERROR(mIsInput ? inRead(buffer, frameCount, actualFrameCount)
                                    : outWrite(buffer, frameCount, actualFrameCount));
    const long bufferDurationUs =
            (*actualFrameCount) * MICROS_PER_SECOND / mContext.getSampleRate();
    const long totalDurationUs = (::android::uptimeNanos() - mStartTimeNs) / NANOS_PER_MICROSECOND;
    mFramesSinceStart += *actualFrameCount;
    const long totalOffsetUs =
            mFramesSinceStart * MICROS_PER_SECOND / mContext.getSampleRate() - totalDurationUs;
    LOG(VERBOSE) << __func__ << ": totalOffsetUs " << totalOffsetUs;
    if (totalOffsetUs > 0) {
        const long sleepTimeUs = std::min(totalOffsetUs, bufferDurationUs);
        LOG(VERBOSE) << __func__ << ": sleeping for " << sleepTimeUs << " us";
        usleep(sleepTimeUs);
    }
    return ::android::OK;
}

::android::status_t StreamRemoteSubmix::refinePosition(StreamDescriptor::Position* position) {
@@ -200,12 +214,7 @@ size_t StreamRemoteSubmix::getStreamPipeSizeInFrames() {
    if (sink != nullptr) {
        if (sink->isShutdown()) {
            sink.clear();
            const auto delayUs = getDelayInUsForFrameCount(frameCount);
            LOG(DEBUG) << __func__ << ": pipe shutdown, ignoring the write, sleeping for "
                       << delayUs << " us";
            // the pipe has already been shutdown, this buffer will be lost but we must
            // simulate timing so we don't drain the output faster than realtime
            usleep(delayUs);
            LOG(DEBUG) << __func__ << ": pipe shutdown, ignoring the write";
            *actualFrameCount = frameCount;
            return ::android::OK;
        }
@@ -214,6 +223,9 @@ size_t StreamRemoteSubmix::getStreamPipeSizeInFrames() {
        return ::android::UNKNOWN_ERROR;
    }

    LOG(VERBOSE) << __func__ << ": " << mDeviceAddress.toString() << ", " << frameCount
                 << " frames";

    const bool shouldBlockWrite = mCurrentRoute->shouldBlockWrite();
    size_t availableToWrite = sink->availableToWrite();
    // NOTE: sink has been checked above and sink and source life cycles are synchronized
@@ -236,6 +248,8 @@ size_t StreamRemoteSubmix::getStreamPipeSizeInFrames() {
    availableToWrite = sink->availableToWrite();

    if (!shouldBlockWrite && frameCount > availableToWrite) {
        LOG(WARNING) << __func__ << ": writing " << availableToWrite << " vs. requested "
                     << frameCount;
        // Truncate the request to avoid blocking.
        frameCount = availableToWrite;
    }
@@ -258,92 +272,59 @@ size_t StreamRemoteSubmix::getStreamPipeSizeInFrames() {
        *actualFrameCount = 0;
        return ::android::UNKNOWN_ERROR;
    }
    LOG(VERBOSE) << __func__ << ": wrote " << writtenFrames << "frames";
    if (writtenFrames > 0 && frameCount > (size_t)writtenFrames) {
        LOG(WARNING) << __func__ << ": wrote " << writtenFrames << " vs. requested " << frameCount;
    }
    *actualFrameCount = writtenFrames;
    return ::android::OK;
}

::android::status_t StreamRemoteSubmix::inRead(void* buffer, size_t frameCount,
                                               size_t* actualFrameCount) {
    // in any case, it is emulated that data for the entire buffer was available
    memset(buffer, 0, mStreamConfig.frameSize * frameCount);
    *actualFrameCount = frameCount;

    // about to read from audio source
    sp<MonoPipeReader> source = mCurrentRoute->getSource();
    if (source == nullptr) {
        int readErrorCount = mCurrentRoute->notifyReadError();
        if (readErrorCount < kMaxReadErrorLogs) {
        if (++mReadErrorCount < kMaxReadErrorLogs) {
            LOG(ERROR) << __func__
                       << ": no audio pipe yet we're trying to read! (not all errors will be "
                          "logged)";
        } else {
            LOG(ERROR) << __func__ << ": Read errors " << readErrorCount;
        }
        const auto delayUs = getDelayInUsForFrameCount(frameCount);
        LOG(DEBUG) << __func__ << ": no source, ignoring the read, sleeping for " << delayUs
                   << " us";
        usleep(delayUs);
        memset(buffer, 0, mStreamConfig.frameSize * frameCount);
        *actualFrameCount = frameCount;
        return ::android::OK;
    }

    LOG(VERBOSE) << __func__ << ": " << mDeviceAddress.toString() << ", " << frameCount
                 << " frames";
    // read the data from the pipe
    int attempts = 0;
    const long delayUs = kReadAttemptSleepUs;
    char* buff = (char*)buffer;
    size_t remainingFrames = frameCount;
    int availableToRead = source->availableToRead();

    while ((remainingFrames > 0) && (availableToRead > 0) && (attempts < kMaxReadFailureAttempts)) {
        LOG(VERBOSE) << __func__ << ": frames available to read " << availableToRead;

    size_t actuallyRead = 0;
    long remainingFrames = frameCount;
    const long deadlineTimeNs = ::android::uptimeNanos() +
                                getDelayInUsForFrameCount(frameCount) * NANOS_PER_MICROSECOND;
    while (remainingFrames > 0) {
        ssize_t framesRead = source->read(buff, remainingFrames);

        LOG(VERBOSE) << __func__ << ": frames read " << framesRead;

        if (framesRead > 0) {
            remainingFrames -= framesRead;
            buff += framesRead * mStreamConfig.frameSize;
            availableToRead -= framesRead;
            LOG(VERBOSE) << __func__ << ": (attempts = " << attempts << ") got " << framesRead
            LOG(VERBOSE) << __func__ << ": got " << framesRead
                         << " frames, remaining =" << remainingFrames;
        } else {
            attempts++;
            LOG(WARNING) << __func__ << ": read returned " << framesRead
                         << " , read failure attempts = " << attempts << ", sleeping for "
                         << delayUs << " us";
            usleep(delayUs);
            actuallyRead += framesRead;
        }
        if (::android::uptimeNanos() >= deadlineTimeNs) break;
        if (framesRead <= 0) {
            LOG(VERBOSE) << __func__ << ": read returned " << framesRead
                         << ", read failure, sleeping for " << kReadAttemptSleepUs << " us";
            usleep(kReadAttemptSleepUs);
        }
    // done using the source
    source.clear();

    if (remainingFrames > 0) {
        const size_t remainingBytes = remainingFrames * mStreamConfig.frameSize;
        LOG(VERBOSE) << __func__ << ": clearing remaining_frames = " << remainingFrames;
        memset(((char*)buffer) + (mStreamConfig.frameSize * frameCount) - remainingBytes, 0,
               remainingBytes);
    }

    long readCounterFrames = mCurrentRoute->updateReadCounterFrames(frameCount);
    *actualFrameCount = frameCount;

    // compute how much we need to sleep after reading the data by comparing the wall clock with
    //   the projected time at which we should return.
    // wall clock after reading from the pipe
    auto recordDurationUs = std::chrono::duration_cast<std::chrono::microseconds>(
            std::chrono::steady_clock::now() - mCurrentRoute->getRecordStartTime());

    // readCounterFrames contains the number of frames that have been read since the beginning of
    // recording (including this call): it's converted to usec and compared to how long we've been
    // recording for, which gives us how long we must wait to sync the projected recording time, and
    // the observed recording time.
    const long projectedVsObservedOffsetUs =
            getDelayInUsForFrameCount(readCounterFrames) - recordDurationUs.count();

    LOG(VERBOSE) << __func__ << ": record duration " << recordDurationUs.count()
                 << " us, will wait: " << projectedVsObservedOffsetUs << " us";
    if (projectedVsObservedOffsetUs > 0) {
        usleep(projectedVsObservedOffsetUs);
    if (actuallyRead < frameCount) {
        LOG(WARNING) << __func__ << ": read " << actuallyRead << " vs. requested " << frameCount;
    }
    mCurrentRoute->updateReadCounterFrames(*actualFrameCount);
    return ::android::OK;
}

+0 −9
Original line number Diff line number Diff line
@@ -81,11 +81,6 @@ bool SubmixRoute::shouldBlockWrite() {
    return (mStreamInOpen || (mStreamInStandby && (mReadCounterFrames != 0)));
}

int SubmixRoute::notifyReadError() {
    std::lock_guard guard(mLock);
    return ++mReadErrorCount;
}

long SubmixRoute::updateReadCounterFrames(size_t frameCount) {
    std::lock_guard guard(mLock);
    mReadCounterFrames += frameCount;
@@ -103,7 +98,6 @@ void SubmixRoute::openStream(bool isInput) {
        }
        mStreamInStandby = true;
        mReadCounterFrames = 0;
        mReadErrorCount = 0;
    } else {
        mStreamOutOpen = true;
    }
@@ -214,9 +208,6 @@ void SubmixRoute::exitStandby(bool isInput) {
        if (mStreamInStandby || mStreamOutStandbyTransition) {
            mStreamInStandby = false;
            mStreamOutStandbyTransition = false;
            // keep track of when we exit input standby (== first read == start "real recording")
            // or when we start recording silence, and reset projected time
            mRecordStartTime = std::chrono::steady_clock::now();
            mReadCounterFrames = 0;
        }
    } else {
+0 −12
Original line number Diff line number Diff line
@@ -16,7 +16,6 @@

#pragma once

#include <chrono>
#include <mutex>

#include <android-base/thread_annotations.h>
@@ -83,14 +82,6 @@ class SubmixRoute {
        std::lock_guard guard(mLock);
        return mReadCounterFrames;
    }
    int getReadErrorCount() {
        std::lock_guard guard(mLock);
        return mReadErrorCount;
    }
    std::chrono::time_point<std::chrono::steady_clock> getRecordStartTime() {
        std::lock_guard guard(mLock);
        return mRecordStartTime;
    }
    sp<MonoPipe> getSink() {
        std::lock_guard guard(mLock);
        return mSink;
@@ -126,9 +117,6 @@ class SubmixRoute {
    bool mStreamOutStandby GUARDED_BY(mLock) = true;
    // how many frames have been requested to be read since standby
    long mReadCounterFrames GUARDED_BY(mLock) = 0;
    int mReadErrorCount GUARDED_BY(mLock) = 0;
    // wall clock when recording starts
    std::chrono::time_point<std::chrono::steady_clock> mRecordStartTime GUARDED_BY(mLock);

    // Pipe variables: they handle the ring buffer that "pipes" audio:
    //  - from the submix virtual audio output == what needs to be played