Donate to e Foundation | Murena handsets with /e/OS | Own a part of Murena! Learn more

Commit db53d0a5 authored by Treehugger Robot's avatar Treehugger Robot Committed by Gerrit Code Review
Browse files

Merge "Stop PCM streams before attempting to close them" into main

parents f046efb6 2ef92bc9
Loading
Loading
Loading
Loading
+37 −38
Original line number Diff line number Diff line
@@ -704,44 +704,7 @@ ndk::ScopedAStatus StreamCommonImpl::initInstance(
        LOG(ERROR) << __func__ << ": Worker start error: " << mWorker->getError();
        return ndk::ScopedAStatus::fromExceptionCode(EX_ILLEGAL_STATE);
    }
    if (auto flags = getContext().getFlags();
        (flags.getTag() == AudioIoFlags::Tag::input &&
         isBitPositionFlagSet(flags.template get<AudioIoFlags::Tag::input>(),
                              AudioInputFlags::FAST)) ||
        (flags.getTag() == AudioIoFlags::Tag::output &&
         (isBitPositionFlagSet(flags.template get<AudioIoFlags::Tag::output>(),
                               AudioOutputFlags::FAST) ||
          isBitPositionFlagSet(flags.template get<AudioIoFlags::Tag::output>(),
                               AudioOutputFlags::SPATIALIZER)))) {
        // FAST workers should be run with a SCHED_FIFO scheduler, however the host process
        // might be lacking the capability to request it, thus a failure to set is not an error.
        pid_t workerTid = mWorker->getTid();
        if (workerTid > 0) {
            constexpr int32_t kRTPriorityMin = 1;  // SchedulingPolicyService.PRIORITY_MIN (Java).
            constexpr int32_t kRTPriorityMax = 3;  // SchedulingPolicyService.PRIORITY_MAX (Java).
            int priorityBoost = kRTPriorityMax;
            if (flags.getTag() == AudioIoFlags::Tag::output &&
                isBitPositionFlagSet(flags.template get<AudioIoFlags::Tag::output>(),
                                     AudioOutputFlags::SPATIALIZER)) {
                const int32_t sptPrio =
                        property_get_int32("audio.spatializer.priority", kRTPriorityMin);
                if (sptPrio >= kRTPriorityMin && sptPrio <= kRTPriorityMax) {
                    priorityBoost = sptPrio;
                } else {
                    LOG(WARNING) << __func__ << ": invalid spatializer priority: " << sptPrio;
                    return ndk::ScopedAStatus::ok();
                }
            }
            struct sched_param param = {
                    .sched_priority = priorityBoost,
            };
            if (sched_setscheduler(workerTid, SCHED_FIFO | SCHED_RESET_ON_FORK, &param) != 0) {
                PLOG(WARNING) << __func__ << ": failed to set FIFO scheduler and priority";
            }
        } else {
            LOG(WARNING) << __func__ << ": invalid worker tid: " << workerTid;
        }
    }
    setWorkerThreadPriority(mWorker->getTid());
    getContext().getCommandMQ()->setErrorHandler(
            fmqErrorHandler<StreamContext::CommandMQ::Error>("CommandMQ"));
    getContext().getReplyMQ()->setErrorHandler(
@@ -830,6 +793,42 @@ void StreamCommonImpl::cleanupWorker() {
    }
}

void StreamCommonImpl::setWorkerThreadPriority(pid_t workerTid) {
    // FAST workers should be run with a SCHED_FIFO scheduler, however the host process
    // might be lacking the capability to request it, thus a failure to set is not an error.
    if (auto flags = getContext().getFlags();
        (flags.getTag() == AudioIoFlags::Tag::input &&
         isBitPositionFlagSet(flags.template get<AudioIoFlags::Tag::input>(),
                              AudioInputFlags::FAST)) ||
        (flags.getTag() == AudioIoFlags::Tag::output &&
         (isBitPositionFlagSet(flags.template get<AudioIoFlags::Tag::output>(),
                               AudioOutputFlags::FAST) ||
          isBitPositionFlagSet(flags.template get<AudioIoFlags::Tag::output>(),
                               AudioOutputFlags::SPATIALIZER)))) {
        constexpr int32_t kRTPriorityMin = 1;  // SchedulingPolicyService.PRIORITY_MIN (Java).
        constexpr int32_t kRTPriorityMax = 3;  // SchedulingPolicyService.PRIORITY_MAX (Java).
        int priorityBoost = kRTPriorityMax;
        if (flags.getTag() == AudioIoFlags::Tag::output &&
            isBitPositionFlagSet(flags.template get<AudioIoFlags::Tag::output>(),
                                 AudioOutputFlags::SPATIALIZER)) {
            const int32_t sptPrio =
                    property_get_int32("audio.spatializer.priority", kRTPriorityMin);
            if (sptPrio >= kRTPriorityMin && sptPrio <= kRTPriorityMax) {
                priorityBoost = sptPrio;
            } else {
                LOG(WARNING) << __func__ << ": invalid spatializer priority: " << sptPrio;
                return;
            }
        }
        struct sched_param param = {
                .sched_priority = priorityBoost,
        };
        if (sched_setscheduler(workerTid, SCHED_FIFO | SCHED_RESET_ON_FORK, &param) != 0) {
            PLOG(WARNING) << __func__ << ": failed to set FIFO scheduler and priority";
        }
    }
}

void StreamCommonImpl::stopAndJoinWorker() {
    stopWorker();
    LOG(DEBUG) << __func__ << ": joining the worker thread...";
+151 −9
Original line number Diff line number Diff line
@@ -23,9 +23,12 @@
#include <Utils.h>
#include <audio_utils/clock.h>
#include <error/expected_utils.h>
#include <media/AidlConversionCppNdk.h>

#include "core-impl/StreamAlsa.h"

using aidl::android::hardware::audio::common::getChannelCount;

namespace aidl::android::hardware::audio::core {

StreamAlsa::StreamAlsa(StreamContext* context, const Metadata& metadata, int readWriteRetries)
@@ -41,6 +44,34 @@ StreamAlsa::~StreamAlsa() {
    cleanupWorker();
}

::android::NBAIO_Format StreamAlsa::getPipeFormat() const {
    const audio_format_t audioFormat = VALUE_OR_FATAL(
            aidl2legacy_AudioFormatDescription_audio_format_t(getContext().getFormat()));
    const int channelCount = getChannelCount(getContext().getChannelLayout());
    return ::android::Format_from_SR_C(getContext().getSampleRate(), channelCount, audioFormat);
}

::android::sp<::android::MonoPipe> StreamAlsa::makeSink(bool writeCanBlock) {
    const ::android::NBAIO_Format format = getPipeFormat();
    auto sink = ::android::sp<::android::MonoPipe>::make(mBufferSizeFrames, format, writeCanBlock);
    const ::android::NBAIO_Format offers[1] = {format};
    size_t numCounterOffers = 0;
    ssize_t index = sink->negotiate(offers, 1, nullptr, numCounterOffers);
    LOG_IF(FATAL, index != 0) << __func__ << ": Negotiation for the sink failed, index = " << index;
    return sink;
}

::android::sp<::android::MonoPipeReader> StreamAlsa::makeSource(::android::MonoPipe* pipe) {
    const ::android::NBAIO_Format format = getPipeFormat();
    const ::android::NBAIO_Format offers[1] = {format};
    auto source = ::android::sp<::android::MonoPipeReader>::make(pipe);
    size_t numCounterOffers = 0;
    ssize_t index = source->negotiate(offers, 1, nullptr, numCounterOffers);
    LOG_IF(FATAL, index != 0) << __func__
                              << ": Negotiation for the source failed, index = " << index;
    return source;
}

::android::status_t StreamAlsa::init() {
    return mConfig.has_value() ? ::android::OK : ::android::NO_INIT;
}
@@ -64,7 +95,7 @@ StreamAlsa::~StreamAlsa() {
}

::android::status_t StreamAlsa::standby() {
    mAlsaDeviceProxies.clear();
    teardownIo();
    return ::android::OK;
}

@@ -74,6 +105,8 @@ StreamAlsa::~StreamAlsa() {
        return ::android::OK;
    }
    decltype(mAlsaDeviceProxies) alsaDeviceProxies;
    decltype(mSources) sources;
    decltype(mSinks) sinks;
    for (const auto& device : getDeviceProfiles()) {
        if ((device.direction == PCM_OUT && mIsInput) ||
            (device.direction == PCM_IN && !mIsInput)) {
@@ -95,11 +128,29 @@ StreamAlsa::~StreamAlsa() {
            return ::android::NO_INIT;
        }
        alsaDeviceProxies.push_back(std::move(proxy));
        auto sink = makeSink(mIsInput);  // Do not block the writer when it is on our thread.
        if (sink != nullptr) {
            sinks.push_back(sink);
        } else {
            return ::android::NO_INIT;
        }
        if (auto source = makeSource(sink.get()); source != nullptr) {
            sources.push_back(source);
        } else {
            return ::android::NO_INIT;
        }
    }
    if (alsaDeviceProxies.empty()) {
        return ::android::NO_INIT;
    }
    mAlsaDeviceProxies = std::move(alsaDeviceProxies);
    mSources = std::move(sources);
    mSinks = std::move(sinks);
    mIoThreadIsRunning = true;
    for (size_t i = 0; i < mAlsaDeviceProxies.size(); ++i) {
        mIoThreads.emplace_back(mIsInput ? &StreamAlsa::inputIoThread : &StreamAlsa::outputIoThread,
                                this, i);
    }
    return ::android::OK;
}

@@ -112,15 +163,30 @@ StreamAlsa::~StreamAlsa() {
    const size_t bytesToTransfer = frameCount * mFrameSizeBytes;
    unsigned maxLatency = 0;
    if (mIsInput) {
        // For input case, only support single device.
        proxy_read_with_retries(mAlsaDeviceProxies[0].get(), buffer, bytesToTransfer,
                                mReadWriteRetries);
        maxLatency = proxy_get_latency(mAlsaDeviceProxies[0].get());
        const size_t i = 0;  // For the input case, only support a single device.
        LOG(VERBOSE) << __func__ << ": reading from sink " << i;
        ssize_t framesRead = mSources[i]->read(buffer, frameCount);
        LOG_IF(FATAL, framesRead < 0) << "Error reading from the pipe: " << framesRead;
        if (ssize_t framesMissing = static_cast<ssize_t>(frameCount) - framesRead;
            framesMissing > 0) {
            LOG(WARNING) << __func__ << ": incomplete data received, inserting " << framesMissing
                         << " frames of silence";
            memset(static_cast<char*>(buffer) + framesRead * mFrameSizeBytes, 0,
                   framesMissing * mFrameSizeBytes);
        }
        maxLatency = proxy_get_latency(mAlsaDeviceProxies[i].get());
    } else {
        alsa::applyGain(buffer, mGain, bytesToTransfer, mConfig.value().format, mConfig->channels);
        for (auto& proxy : mAlsaDeviceProxies) {
            proxy_write_with_retries(proxy.get(), buffer, bytesToTransfer, mReadWriteRetries);
            maxLatency = std::max(maxLatency, proxy_get_latency(proxy.get()));
        for (size_t i = 0; i < mAlsaDeviceProxies.size(); ++i) {
            LOG(VERBOSE) << __func__ << ": writing into sink " << i;
            ssize_t framesWritten = mSinks[i]->write(buffer, frameCount);
            LOG_IF(FATAL, framesWritten < 0) << "Error writing into the pipe: " << framesWritten;
            if (ssize_t framesLost = static_cast<ssize_t>(frameCount) - framesWritten;
                framesLost > 0) {
                LOG(WARNING) << __func__ << ": sink " << i << " incomplete data sent, dropping "
                             << framesLost << " frames";
            }
            maxLatency = std::max(maxLatency, proxy_get_latency(mAlsaDeviceProxies[i].get()));
        }
    }
    *actualFrameCount = frameCount;
@@ -164,7 +230,7 @@ StreamAlsa::~StreamAlsa() {
}

void StreamAlsa::shutdown() {
    mAlsaDeviceProxies.clear();
    teardownIo();
}

ndk::ScopedAStatus StreamAlsa::setGain(float gain) {
@@ -172,4 +238,80 @@ ndk::ScopedAStatus StreamAlsa::setGain(float gain) {
    return ndk::ScopedAStatus::ok();
}

void StreamAlsa::inputIoThread(size_t idx) {
#if defined(__ANDROID__)
    setWorkerThreadPriority(pthread_gettid_np(pthread_self()));
    const std::string threadName = (std::string("in_") + std::to_string(idx)).substr(0, 15);
    pthread_setname_np(pthread_self(), threadName.c_str());
#endif
    const size_t bufferSize = mBufferSizeFrames * mFrameSizeBytes;
    std::vector<char> buffer(bufferSize);
    while (mIoThreadIsRunning) {
        if (int ret = proxy_read_with_retries(mAlsaDeviceProxies[idx].get(), &buffer[0], bufferSize,
                                              mReadWriteRetries);
            ret == 0) {
            size_t bufferFramesWritten = 0;
            while (bufferFramesWritten < mBufferSizeFrames) {
                if (!mIoThreadIsRunning) return;
                ssize_t framesWrittenOrError =
                        mSinks[idx]->write(&buffer[0], mBufferSizeFrames - bufferFramesWritten);
                if (framesWrittenOrError >= 0) {
                    bufferFramesWritten += framesWrittenOrError;
                } else {
                    LOG(WARNING) << __func__ << "[" << idx
                                 << "]: Error while writing into the pipe: "
                                 << framesWrittenOrError;
                }
            }
        } else {
            // Errors when the stream is being stopped are expected.
            LOG_IF(WARNING, mIoThreadIsRunning)
                    << __func__ << "[" << idx << "]: Error reading from ALSA: " << ret;
        }
    }
}

void StreamAlsa::outputIoThread(size_t idx) {
#if defined(__ANDROID__)
    setWorkerThreadPriority(pthread_gettid_np(pthread_self()));
    const std::string threadName = (std::string("out_") + std::to_string(idx)).substr(0, 15);
    pthread_setname_np(pthread_self(), threadName.c_str());
#endif
    const size_t bufferSize = mBufferSizeFrames * mFrameSizeBytes;
    std::vector<char> buffer(bufferSize);
    while (mIoThreadIsRunning) {
        ssize_t framesRead = mSources[idx]->read(&buffer[0], mBufferSizeFrames);
        if (framesRead > 0) {
            int ret = proxy_write_with_retries(mAlsaDeviceProxies[idx].get(), &buffer[0],
                                               framesRead * mFrameSizeBytes, mReadWriteRetries);
            // Errors when the stream is being stopped are expected.
            LOG_IF(WARNING, ret != 0 && mIoThreadIsRunning)
                    << __func__ << "[" << idx << "]: Error writing into ALSA: " << ret;
        }
    }
}

void StreamAlsa::teardownIo() {
    mIoThreadIsRunning = false;
    if (mIsInput) {
        LOG(DEBUG) << __func__ << ": shutting down pipes";
        for (auto& sink : mSinks) {
            sink->shutdown(true);
        }
    }
    LOG(DEBUG) << __func__ << ": stopping PCM streams";
    for (const auto& proxy : mAlsaDeviceProxies) {
        proxy_stop(proxy.get());
    }
    LOG(DEBUG) << __func__ << ": joining threads";
    for (auto& thread : mIoThreads) {
        if (thread.joinable()) thread.join();
    }
    mIoThreads.clear();
    LOG(DEBUG) << __func__ << ": closing PCM devices";
    mAlsaDeviceProxies.clear();
    mSources.clear();
    mSinks.clear();
}

}  // namespace aidl::android::hardware::audio::core
+2 −2
Original line number Diff line number Diff line
@@ -48,8 +48,8 @@ class DeviceProxy {
  public:
    DeviceProxy();  // Constructs a "null" proxy.
    explicit DeviceProxy(const DeviceProfile& deviceProfile);
    alsa_device_profile* getProfile() { return mProfile.get(); }
    alsa_device_proxy* get() { return mProxy.get(); }
    alsa_device_profile* getProfile() const { return mProfile.get(); }
    alsa_device_proxy* get() const { return mProxy.get(); }

  private:
    static void alsaProxyDeleter(alsa_device_proxy* proxy);
+1 −0
Original line number Diff line number Diff line
@@ -475,6 +475,7 @@ class StreamCommonImpl : virtual public StreamCommonInterface, virtual public Dr
    // the destructor in order to stop and join the worker thread in the case when the client
    // has not called 'IStreamCommon::close' method.
    void cleanupWorker();
    void setWorkerThreadPriority(pid_t workerTid);
    void stopAndJoinWorker();
    void stopWorker();

+20 −2
Original line number Diff line number Diff line
@@ -16,9 +16,14 @@

#pragma once

#include <atomic>
#include <optional>
#include <thread>
#include <vector>

#include <media/nbaio/MonoPipe.h>
#include <media/nbaio/MonoPipeReader.h>

#include "Stream.h"
#include "alsa/Utils.h"

@@ -57,11 +62,24 @@ class StreamAlsa : public StreamCommonImpl {
    const bool mIsInput;
    const std::optional<struct pcm_config> mConfig;
    const int mReadWriteRetries;
    // All fields below are only used on the worker thread.
    std::vector<alsa::DeviceProxy> mAlsaDeviceProxies;

  private:
    ::android::NBAIO_Format getPipeFormat() const;
    ::android::sp<::android::MonoPipe> makeSink(bool writeCanBlock);
    ::android::sp<::android::MonoPipeReader> makeSource(::android::MonoPipe* pipe);
    void inputIoThread(size_t idx);
    void outputIoThread(size_t idx);
    void teardownIo();

    std::atomic<float> mGain = 1.0;

    // All fields below are only used on the worker thread.
    std::vector<alsa::DeviceProxy> mAlsaDeviceProxies;
    // Only 'libnbaio_mono' is vendor-accessible, thus no access to the multi-reader Pipe.
    std::vector<::android::sp<::android::MonoPipe>> mSinks;
    std::vector<::android::sp<::android::MonoPipeReader>> mSources;
    std::vector<std::thread> mIoThreads;
    std::atomic<bool> mIoThreadIsRunning = false;  // used by all threads
};

}  // namespace aidl::android::hardware::audio::core