Donate to e Foundation | Murena handsets with /e/OS | Own a part of Murena! Learn more

Commit 2ef92bc9 authored by Mikhail Naganov's avatar Mikhail Naganov
Browse files

Stop PCM streams before attempting to close them

In order to break out from a data wait loop in the driver
the stream state must be changed from "running". This is
achieved by calling `pcm_stop` from the stream thread.

Added a dedicated "I/O" thread to 'StreamAlsa' to be able
to call `pcm_stop` while an I/O operation is running. The
"I/O" thread is connected to the worker thread by means
of a 'MonoPipe'.

Bug: 364960013
Test: atest CtsMediaAudioTestCases
Test: atest VtsHalAudioCoreTargetTest
Change-Id: Ibb020d25f42df54baf46a37b50577cce294dc053
parent 4007dfd5
Loading
Loading
Loading
Loading
+37 −38
Original line number Diff line number Diff line
@@ -704,44 +704,7 @@ ndk::ScopedAStatus StreamCommonImpl::initInstance(
        LOG(ERROR) << __func__ << ": Worker start error: " << mWorker->getError();
        return ndk::ScopedAStatus::fromExceptionCode(EX_ILLEGAL_STATE);
    }
    if (auto flags = getContext().getFlags();
        (flags.getTag() == AudioIoFlags::Tag::input &&
         isBitPositionFlagSet(flags.template get<AudioIoFlags::Tag::input>(),
                              AudioInputFlags::FAST)) ||
        (flags.getTag() == AudioIoFlags::Tag::output &&
         (isBitPositionFlagSet(flags.template get<AudioIoFlags::Tag::output>(),
                               AudioOutputFlags::FAST) ||
          isBitPositionFlagSet(flags.template get<AudioIoFlags::Tag::output>(),
                               AudioOutputFlags::SPATIALIZER)))) {
        // FAST workers should be run with a SCHED_FIFO scheduler, however the host process
        // might be lacking the capability to request it, thus a failure to set is not an error.
        pid_t workerTid = mWorker->getTid();
        if (workerTid > 0) {
            constexpr int32_t kRTPriorityMin = 1;  // SchedulingPolicyService.PRIORITY_MIN (Java).
            constexpr int32_t kRTPriorityMax = 3;  // SchedulingPolicyService.PRIORITY_MAX (Java).
            int priorityBoost = kRTPriorityMax;
            if (flags.getTag() == AudioIoFlags::Tag::output &&
                isBitPositionFlagSet(flags.template get<AudioIoFlags::Tag::output>(),
                                     AudioOutputFlags::SPATIALIZER)) {
                const int32_t sptPrio =
                        property_get_int32("audio.spatializer.priority", kRTPriorityMin);
                if (sptPrio >= kRTPriorityMin && sptPrio <= kRTPriorityMax) {
                    priorityBoost = sptPrio;
                } else {
                    LOG(WARNING) << __func__ << ": invalid spatializer priority: " << sptPrio;
                    return ndk::ScopedAStatus::ok();
                }
            }
            struct sched_param param = {
                    .sched_priority = priorityBoost,
            };
            if (sched_setscheduler(workerTid, SCHED_FIFO | SCHED_RESET_ON_FORK, &param) != 0) {
                PLOG(WARNING) << __func__ << ": failed to set FIFO scheduler and priority";
            }
        } else {
            LOG(WARNING) << __func__ << ": invalid worker tid: " << workerTid;
        }
    }
    setWorkerThreadPriority(mWorker->getTid());
    getContext().getCommandMQ()->setErrorHandler(
            fmqErrorHandler<StreamContext::CommandMQ::Error>("CommandMQ"));
    getContext().getReplyMQ()->setErrorHandler(
@@ -830,6 +793,42 @@ void StreamCommonImpl::cleanupWorker() {
    }
}

void StreamCommonImpl::setWorkerThreadPriority(pid_t workerTid) {
    // FAST workers should be run with a SCHED_FIFO scheduler, however the host process
    // might be lacking the capability to request it, thus a failure to set is not an error.
    if (auto flags = getContext().getFlags();
        (flags.getTag() == AudioIoFlags::Tag::input &&
         isBitPositionFlagSet(flags.template get<AudioIoFlags::Tag::input>(),
                              AudioInputFlags::FAST)) ||
        (flags.getTag() == AudioIoFlags::Tag::output &&
         (isBitPositionFlagSet(flags.template get<AudioIoFlags::Tag::output>(),
                               AudioOutputFlags::FAST) ||
          isBitPositionFlagSet(flags.template get<AudioIoFlags::Tag::output>(),
                               AudioOutputFlags::SPATIALIZER)))) {
        constexpr int32_t kRTPriorityMin = 1;  // SchedulingPolicyService.PRIORITY_MIN (Java).
        constexpr int32_t kRTPriorityMax = 3;  // SchedulingPolicyService.PRIORITY_MAX (Java).
        int priorityBoost = kRTPriorityMax;
        if (flags.getTag() == AudioIoFlags::Tag::output &&
            isBitPositionFlagSet(flags.template get<AudioIoFlags::Tag::output>(),
                                 AudioOutputFlags::SPATIALIZER)) {
            const int32_t sptPrio =
                    property_get_int32("audio.spatializer.priority", kRTPriorityMin);
            if (sptPrio >= kRTPriorityMin && sptPrio <= kRTPriorityMax) {
                priorityBoost = sptPrio;
            } else {
                LOG(WARNING) << __func__ << ": invalid spatializer priority: " << sptPrio;
                return;
            }
        }
        struct sched_param param = {
                .sched_priority = priorityBoost,
        };
        if (sched_setscheduler(workerTid, SCHED_FIFO | SCHED_RESET_ON_FORK, &param) != 0) {
            PLOG(WARNING) << __func__ << ": failed to set FIFO scheduler and priority";
        }
    }
}

void StreamCommonImpl::stopAndJoinWorker() {
    stopWorker();
    LOG(DEBUG) << __func__ << ": joining the worker thread...";
+151 −9
Original line number Diff line number Diff line
@@ -23,9 +23,12 @@
#include <Utils.h>
#include <audio_utils/clock.h>
#include <error/expected_utils.h>
#include <media/AidlConversionCppNdk.h>

#include "core-impl/StreamAlsa.h"

using aidl::android::hardware::audio::common::getChannelCount;

namespace aidl::android::hardware::audio::core {

StreamAlsa::StreamAlsa(StreamContext* context, const Metadata& metadata, int readWriteRetries)
@@ -41,6 +44,34 @@ StreamAlsa::~StreamAlsa() {
    cleanupWorker();
}

::android::NBAIO_Format StreamAlsa::getPipeFormat() const {
    const audio_format_t audioFormat = VALUE_OR_FATAL(
            aidl2legacy_AudioFormatDescription_audio_format_t(getContext().getFormat()));
    const int channelCount = getChannelCount(getContext().getChannelLayout());
    return ::android::Format_from_SR_C(getContext().getSampleRate(), channelCount, audioFormat);
}

::android::sp<::android::MonoPipe> StreamAlsa::makeSink(bool writeCanBlock) {
    const ::android::NBAIO_Format format = getPipeFormat();
    auto sink = ::android::sp<::android::MonoPipe>::make(mBufferSizeFrames, format, writeCanBlock);
    const ::android::NBAIO_Format offers[1] = {format};
    size_t numCounterOffers = 0;
    ssize_t index = sink->negotiate(offers, 1, nullptr, numCounterOffers);
    LOG_IF(FATAL, index != 0) << __func__ << ": Negotiation for the sink failed, index = " << index;
    return sink;
}

::android::sp<::android::MonoPipeReader> StreamAlsa::makeSource(::android::MonoPipe* pipe) {
    const ::android::NBAIO_Format format = getPipeFormat();
    const ::android::NBAIO_Format offers[1] = {format};
    auto source = ::android::sp<::android::MonoPipeReader>::make(pipe);
    size_t numCounterOffers = 0;
    ssize_t index = source->negotiate(offers, 1, nullptr, numCounterOffers);
    LOG_IF(FATAL, index != 0) << __func__
                              << ": Negotiation for the source failed, index = " << index;
    return source;
}

::android::status_t StreamAlsa::init() {
    return mConfig.has_value() ? ::android::OK : ::android::NO_INIT;
}
@@ -64,7 +95,7 @@ StreamAlsa::~StreamAlsa() {
}

::android::status_t StreamAlsa::standby() {
    mAlsaDeviceProxies.clear();
    teardownIo();
    return ::android::OK;
}

@@ -74,6 +105,8 @@ StreamAlsa::~StreamAlsa() {
        return ::android::OK;
    }
    decltype(mAlsaDeviceProxies) alsaDeviceProxies;
    decltype(mSources) sources;
    decltype(mSinks) sinks;
    for (const auto& device : getDeviceProfiles()) {
        if ((device.direction == PCM_OUT && mIsInput) ||
            (device.direction == PCM_IN && !mIsInput)) {
@@ -95,11 +128,29 @@ StreamAlsa::~StreamAlsa() {
            return ::android::NO_INIT;
        }
        alsaDeviceProxies.push_back(std::move(proxy));
        auto sink = makeSink(mIsInput);  // Do not block the writer when it is on our thread.
        if (sink != nullptr) {
            sinks.push_back(sink);
        } else {
            return ::android::NO_INIT;
        }
        if (auto source = makeSource(sink.get()); source != nullptr) {
            sources.push_back(source);
        } else {
            return ::android::NO_INIT;
        }
    }
    if (alsaDeviceProxies.empty()) {
        return ::android::NO_INIT;
    }
    mAlsaDeviceProxies = std::move(alsaDeviceProxies);
    mSources = std::move(sources);
    mSinks = std::move(sinks);
    mIoThreadIsRunning = true;
    for (size_t i = 0; i < mAlsaDeviceProxies.size(); ++i) {
        mIoThreads.emplace_back(mIsInput ? &StreamAlsa::inputIoThread : &StreamAlsa::outputIoThread,
                                this, i);
    }
    return ::android::OK;
}

@@ -112,15 +163,30 @@ StreamAlsa::~StreamAlsa() {
    const size_t bytesToTransfer = frameCount * mFrameSizeBytes;
    unsigned maxLatency = 0;
    if (mIsInput) {
        // For input case, only support single device.
        proxy_read_with_retries(mAlsaDeviceProxies[0].get(), buffer, bytesToTransfer,
                                mReadWriteRetries);
        maxLatency = proxy_get_latency(mAlsaDeviceProxies[0].get());
        const size_t i = 0;  // For the input case, only support a single device.
        LOG(VERBOSE) << __func__ << ": reading from sink " << i;
        ssize_t framesRead = mSources[i]->read(buffer, frameCount);
        LOG_IF(FATAL, framesRead < 0) << "Error reading from the pipe: " << framesRead;
        if (ssize_t framesMissing = static_cast<ssize_t>(frameCount) - framesRead;
            framesMissing > 0) {
            LOG(WARNING) << __func__ << ": incomplete data received, inserting " << framesMissing
                         << " frames of silence";
            memset(static_cast<char*>(buffer) + framesRead * mFrameSizeBytes, 0,
                   framesMissing * mFrameSizeBytes);
        }
        maxLatency = proxy_get_latency(mAlsaDeviceProxies[i].get());
    } else {
        alsa::applyGain(buffer, mGain, bytesToTransfer, mConfig.value().format, mConfig->channels);
        for (auto& proxy : mAlsaDeviceProxies) {
            proxy_write_with_retries(proxy.get(), buffer, bytesToTransfer, mReadWriteRetries);
            maxLatency = std::max(maxLatency, proxy_get_latency(proxy.get()));
        for (size_t i = 0; i < mAlsaDeviceProxies.size(); ++i) {
            LOG(VERBOSE) << __func__ << ": writing into sink " << i;
            ssize_t framesWritten = mSinks[i]->write(buffer, frameCount);
            LOG_IF(FATAL, framesWritten < 0) << "Error writing into the pipe: " << framesWritten;
            if (ssize_t framesLost = static_cast<ssize_t>(frameCount) - framesWritten;
                framesLost > 0) {
                LOG(WARNING) << __func__ << ": sink " << i << " incomplete data sent, dropping "
                             << framesLost << " frames";
            }
            maxLatency = std::max(maxLatency, proxy_get_latency(mAlsaDeviceProxies[i].get()));
        }
    }
    *actualFrameCount = frameCount;
@@ -164,7 +230,7 @@ StreamAlsa::~StreamAlsa() {
}

void StreamAlsa::shutdown() {
    mAlsaDeviceProxies.clear();
    teardownIo();
}

ndk::ScopedAStatus StreamAlsa::setGain(float gain) {
@@ -172,4 +238,80 @@ ndk::ScopedAStatus StreamAlsa::setGain(float gain) {
    return ndk::ScopedAStatus::ok();
}

void StreamAlsa::inputIoThread(size_t idx) {
#if defined(__ANDROID__)
    setWorkerThreadPriority(pthread_gettid_np(pthread_self()));
    const std::string threadName = (std::string("in_") + std::to_string(idx)).substr(0, 15);
    pthread_setname_np(pthread_self(), threadName.c_str());
#endif
    const size_t bufferSize = mBufferSizeFrames * mFrameSizeBytes;
    std::vector<char> buffer(bufferSize);
    while (mIoThreadIsRunning) {
        if (int ret = proxy_read_with_retries(mAlsaDeviceProxies[idx].get(), &buffer[0], bufferSize,
                                              mReadWriteRetries);
            ret == 0) {
            size_t bufferFramesWritten = 0;
            while (bufferFramesWritten < mBufferSizeFrames) {
                if (!mIoThreadIsRunning) return;
                ssize_t framesWrittenOrError =
                        mSinks[idx]->write(&buffer[0], mBufferSizeFrames - bufferFramesWritten);
                if (framesWrittenOrError >= 0) {
                    bufferFramesWritten += framesWrittenOrError;
                } else {
                    LOG(WARNING) << __func__ << "[" << idx
                                 << "]: Error while writing into the pipe: "
                                 << framesWrittenOrError;
                }
            }
        } else {
            // Errors when the stream is being stopped are expected.
            LOG_IF(WARNING, mIoThreadIsRunning)
                    << __func__ << "[" << idx << "]: Error reading from ALSA: " << ret;
        }
    }
}

void StreamAlsa::outputIoThread(size_t idx) {
#if defined(__ANDROID__)
    setWorkerThreadPriority(pthread_gettid_np(pthread_self()));
    const std::string threadName = (std::string("out_") + std::to_string(idx)).substr(0, 15);
    pthread_setname_np(pthread_self(), threadName.c_str());
#endif
    const size_t bufferSize = mBufferSizeFrames * mFrameSizeBytes;
    std::vector<char> buffer(bufferSize);
    while (mIoThreadIsRunning) {
        ssize_t framesRead = mSources[idx]->read(&buffer[0], mBufferSizeFrames);
        if (framesRead > 0) {
            int ret = proxy_write_with_retries(mAlsaDeviceProxies[idx].get(), &buffer[0],
                                               framesRead * mFrameSizeBytes, mReadWriteRetries);
            // Errors when the stream is being stopped are expected.
            LOG_IF(WARNING, ret != 0 && mIoThreadIsRunning)
                    << __func__ << "[" << idx << "]: Error writing into ALSA: " << ret;
        }
    }
}

void StreamAlsa::teardownIo() {
    mIoThreadIsRunning = false;
    if (mIsInput) {
        LOG(DEBUG) << __func__ << ": shutting down pipes";
        for (auto& sink : mSinks) {
            sink->shutdown(true);
        }
    }
    LOG(DEBUG) << __func__ << ": stopping PCM streams";
    for (const auto& proxy : mAlsaDeviceProxies) {
        proxy_stop(proxy.get());
    }
    LOG(DEBUG) << __func__ << ": joining threads";
    for (auto& thread : mIoThreads) {
        if (thread.joinable()) thread.join();
    }
    mIoThreads.clear();
    LOG(DEBUG) << __func__ << ": closing PCM devices";
    mAlsaDeviceProxies.clear();
    mSources.clear();
    mSinks.clear();
}

}  // namespace aidl::android::hardware::audio::core
+2 −2
Original line number Diff line number Diff line
@@ -48,8 +48,8 @@ class DeviceProxy {
  public:
    DeviceProxy();  // Constructs a "null" proxy.
    explicit DeviceProxy(const DeviceProfile& deviceProfile);
    alsa_device_profile* getProfile() { return mProfile.get(); }
    alsa_device_proxy* get() { return mProxy.get(); }
    alsa_device_profile* getProfile() const { return mProfile.get(); }
    alsa_device_proxy* get() const { return mProxy.get(); }

  private:
    static void alsaProxyDeleter(alsa_device_proxy* proxy);
+1 −0
Original line number Diff line number Diff line
@@ -475,6 +475,7 @@ class StreamCommonImpl : virtual public StreamCommonInterface, virtual public Dr
    // the destructor in order to stop and join the worker thread in the case when the client
    // has not called 'IStreamCommon::close' method.
    void cleanupWorker();
    void setWorkerThreadPriority(pid_t workerTid);
    void stopAndJoinWorker();
    void stopWorker();

+20 −2
Original line number Diff line number Diff line
@@ -16,9 +16,14 @@

#pragma once

#include <atomic>
#include <optional>
#include <thread>
#include <vector>

#include <media/nbaio/MonoPipe.h>
#include <media/nbaio/MonoPipeReader.h>

#include "Stream.h"
#include "alsa/Utils.h"

@@ -57,11 +62,24 @@ class StreamAlsa : public StreamCommonImpl {
    const bool mIsInput;
    const std::optional<struct pcm_config> mConfig;
    const int mReadWriteRetries;
    // All fields below are only used on the worker thread.
    std::vector<alsa::DeviceProxy> mAlsaDeviceProxies;

  private:
    ::android::NBAIO_Format getPipeFormat() const;
    ::android::sp<::android::MonoPipe> makeSink(bool writeCanBlock);
    ::android::sp<::android::MonoPipeReader> makeSource(::android::MonoPipe* pipe);
    void inputIoThread(size_t idx);
    void outputIoThread(size_t idx);
    void teardownIo();

    std::atomic<float> mGain = 1.0;

    // All fields below are only used on the worker thread.
    std::vector<alsa::DeviceProxy> mAlsaDeviceProxies;
    // Only 'libnbaio_mono' is vendor-accessible, thus no access to the multi-reader Pipe.
    std::vector<::android::sp<::android::MonoPipe>> mSinks;
    std::vector<::android::sp<::android::MonoPipeReader>> mSources;
    std::vector<std::thread> mIoThreads;
    std::atomic<bool> mIoThreadIsRunning = false;  // used by all threads
};

}  // namespace aidl::android::hardware::audio::core