Donate to e Foundation | Murena handsets with /e/OS | Own a part of Murena! Learn more

Commit 1eedc130 authored by Mikhail Naganov's avatar Mikhail Naganov
Browse files

audio: Move frame counter to StreamContext

Moving frame counter to the StreamContext class enables
switching stream drivers on the fly while keeping the frame
count monotonically increasing.

StreamWorkerCommonLogic now holds a pointer to StreamContext,
which makes redundant storing copies of the fields of the latter.

Bug: 264712385
Test: atest VtsHalAudioCoreTargetTest
Change-Id: If6716f4051c484b52927cbfe4032df7c907eb3a5
parent 780fefb3
Loading
Loading
Loading
Loading
+45 −38
Original line number Diff line number Diff line
@@ -91,17 +91,18 @@ void StreamContext::reset() {
}

std::string StreamWorkerCommonLogic::init() {
    if (mCommandMQ == nullptr) return "Command MQ is null";
    if (mReplyMQ == nullptr) return "Reply MQ is null";
    if (mDataMQ == nullptr) return "Data MQ is null";
    if (sizeof(DataBufferElement) != mDataMQ->getQuantumSize()) {
        return "Unexpected Data MQ quantum size: " + std::to_string(mDataMQ->getQuantumSize());
    }
    mDataBufferSize = mDataMQ->getQuantumCount() * mDataMQ->getQuantumSize();
    if (mContext->getCommandMQ() == nullptr) return "Command MQ is null";
    if (mContext->getReplyMQ() == nullptr) return "Reply MQ is null";
    StreamContext::DataMQ* const dataMQ = mContext->getDataMQ();
    if (dataMQ == nullptr) return "Data MQ is null";
    if (sizeof(DataBufferElement) != dataMQ->getQuantumSize()) {
        return "Unexpected Data MQ quantum size: " + std::to_string(dataMQ->getQuantumSize());
    }
    mDataBufferSize = dataMQ->getQuantumCount() * dataMQ->getQuantumSize();
    mDataBuffer.reset(new (std::nothrow) DataBufferElement[mDataBufferSize]);
    if (mDataBuffer == nullptr) {
        return "Failed to allocate data buffer for element count " +
               std::to_string(mDataMQ->getQuantumCount()) +
               std::to_string(dataMQ->getQuantumCount()) +
               ", size in bytes: " + std::to_string(mDataBufferSize);
    }
    if (::android::status_t status = mDriver->init(); status != STATUS_OK) {
@@ -114,7 +115,7 @@ void StreamWorkerCommonLogic::populateReply(StreamDescriptor::Reply* reply,
                                            bool isConnected) const {
    reply->status = STATUS_OK;
    if (isConnected) {
        reply->observable.frames = mFrameCount;
        reply->observable.frames = mContext->getFrameCount();
        reply->observable.timeNs = ::android::elapsedRealtimeNano();
        if (auto status = mDriver->getPosition(&reply->observable); status == ::android::OK) {
            return;
@@ -141,7 +142,7 @@ StreamInWorkerLogic::Status StreamInWorkerLogic::cycle() {
    // TODO: Add a delay for transitions of async operations when/if they added.

    StreamDescriptor::Command command{};
    if (!mCommandMQ->readBlocking(&command, 1)) {
    if (!mContext->getCommandMQ()->readBlocking(&command, 1)) {
        LOG(ERROR) << __func__ << ": reading of command from MQ failed";
        mState = StreamDescriptor::State::ERROR;
        return Status::ABORT;
@@ -159,7 +160,7 @@ StreamInWorkerLogic::Status StreamInWorkerLogic::cycle() {
    switch (command.getTag()) {
        case Tag::halReservedExit:
            if (const int32_t cookie = command.get<Tag::halReservedExit>();
                cookie == mInternalCommandCookie) {
                cookie == mContext->getInternalCommandCookie()) {
                mDriver->shutdown();
                setClosed();
                // This is an internal command, no need to reply.
@@ -277,7 +278,7 @@ StreamInWorkerLogic::Status StreamInWorkerLogic::cycle() {
    }
    reply.state = mState;
    LOG(severity) << __func__ << ": writing reply " << reply.toString();
    if (!mReplyMQ->writeBlocking(&reply, 1)) {
    if (!mContext->getReplyMQ()->writeBlocking(&reply, 1)) {
        LOG(ERROR) << __func__ << ": writing of reply " << reply.toString() << " to MQ failed";
        mState = StreamDescriptor::State::ERROR;
        return Status::ABORT;
@@ -286,14 +287,16 @@ StreamInWorkerLogic::Status StreamInWorkerLogic::cycle() {
}

bool StreamInWorkerLogic::read(size_t clientSize, StreamDescriptor::Reply* reply) {
    const size_t byteCount = std::min({clientSize, mDataMQ->availableToWrite(), mDataBufferSize});
    StreamContext::DataMQ* const dataMQ = mContext->getDataMQ();
    const size_t byteCount = std::min({clientSize, dataMQ->availableToWrite(), mDataBufferSize});
    const bool isConnected = mIsConnected;
    const size_t frameSize = mContext->getFrameSize();
    size_t actualFrameCount = 0;
    bool fatal = false;
    int32_t latency = Module::kLatencyMs;
    if (isConnected) {
        if (::android::status_t status = mDriver->transfer(
                    mDataBuffer.get(), byteCount / mFrameSize, &actualFrameCount, &latency);
        if (::android::status_t status = mDriver->transfer(mDataBuffer.get(), byteCount / frameSize,
                                                           &actualFrameCount, &latency);
            status != ::android::OK) {
            fatal = true;
            LOG(ERROR) << __func__ << ": read failed: " << status;
@@ -301,17 +304,16 @@ bool StreamInWorkerLogic::read(size_t clientSize, StreamDescriptor::Reply* reply
    } else {
        usleep(3000);  // Simulate blocking transfer delay.
        for (size_t i = 0; i < byteCount; ++i) mDataBuffer[i] = 0;
        actualFrameCount = byteCount / mFrameSize;
        actualFrameCount = byteCount / frameSize;
    }
    const size_t actualByteCount = actualFrameCount * mFrameSize;
    if (bool success =
                actualByteCount > 0 ? mDataMQ->write(&mDataBuffer[0], actualByteCount) : true;
    const size_t actualByteCount = actualFrameCount * frameSize;
    if (bool success = actualByteCount > 0 ? dataMQ->write(&mDataBuffer[0], actualByteCount) : true;
        success) {
        LOG(VERBOSE) << __func__ << ": writing of " << actualByteCount << " bytes into data MQ"
                     << " succeeded; connected? " << isConnected;
        // Frames are provided and counted regardless of connection status.
        reply->fmqByteCount += actualByteCount;
        mFrameCount += actualFrameCount;
        mContext->advanceFrameCount(actualFrameCount);
        populateReply(reply, isConnected);
    } else {
        LOG(WARNING) << __func__ << ": writing of " << actualByteCount
@@ -330,7 +332,8 @@ StreamOutWorkerLogic::Status StreamOutWorkerLogic::cycle() {
        if (auto stateDurationMs = std::chrono::duration_cast<std::chrono::milliseconds>(
                    std::chrono::steady_clock::now() - mTransientStateStart);
            stateDurationMs >= mTransientStateDelayMs) {
            if (mAsyncCallback == nullptr) {
            std::shared_ptr<IStreamCallback> asyncCallback = mContext->getAsyncCallback();
            if (asyncCallback == nullptr) {
                // In blocking mode, mState can only be DRAINING.
                mState = StreamDescriptor::State::IDLE;
            } else {
@@ -338,13 +341,13 @@ StreamOutWorkerLogic::Status StreamOutWorkerLogic::cycle() {
                // drain or transfer completion. In the stub, we switch unconditionally.
                if (mState == StreamDescriptor::State::DRAINING) {
                    mState = StreamDescriptor::State::IDLE;
                    ndk::ScopedAStatus status = mAsyncCallback->onDrainReady();
                    ndk::ScopedAStatus status = asyncCallback->onDrainReady();
                    if (!status.isOk()) {
                        LOG(ERROR) << __func__ << ": error from onDrainReady: " << status;
                    }
                } else {
                    mState = StreamDescriptor::State::ACTIVE;
                    ndk::ScopedAStatus status = mAsyncCallback->onTransferReady();
                    ndk::ScopedAStatus status = asyncCallback->onTransferReady();
                    if (!status.isOk()) {
                        LOG(ERROR) << __func__ << ": error from onTransferReady: " << status;
                    }
@@ -358,7 +361,7 @@ StreamOutWorkerLogic::Status StreamOutWorkerLogic::cycle() {
    }

    StreamDescriptor::Command command{};
    if (!mCommandMQ->readBlocking(&command, 1)) {
    if (!mContext->getCommandMQ()->readBlocking(&command, 1)) {
        LOG(ERROR) << __func__ << ": reading of command from MQ failed";
        mState = StreamDescriptor::State::ERROR;
        return Status::ABORT;
@@ -377,7 +380,7 @@ StreamOutWorkerLogic::Status StreamOutWorkerLogic::cycle() {
    switch (command.getTag()) {
        case Tag::halReservedExit:
            if (const int32_t cookie = command.get<Tag::halReservedExit>();
                cookie == mInternalCommandCookie) {
                cookie == mContext->getInternalCommandCookie()) {
                mDriver->shutdown();
                setClosed();
                // This is an internal command, no need to reply.
@@ -432,10 +435,11 @@ StreamOutWorkerLogic::Status StreamOutWorkerLogic::cycle() {
                    if (!write(fmqByteCount, &reply)) {
                        mState = StreamDescriptor::State::ERROR;
                    }
                    std::shared_ptr<IStreamCallback> asyncCallback = mContext->getAsyncCallback();
                    if (mState == StreamDescriptor::State::STANDBY ||
                        mState == StreamDescriptor::State::DRAIN_PAUSED ||
                        mState == StreamDescriptor::State::PAUSED) {
                        if (mAsyncCallback == nullptr ||
                        if (asyncCallback == nullptr ||
                            mState != StreamDescriptor::State::DRAIN_PAUSED) {
                            mState = StreamDescriptor::State::PAUSED;
                        } else {
@@ -444,7 +448,7 @@ StreamOutWorkerLogic::Status StreamOutWorkerLogic::cycle() {
                    } else if (mState == StreamDescriptor::State::IDLE ||
                               mState == StreamDescriptor::State::DRAINING ||
                               mState == StreamDescriptor::State::ACTIVE) {
                        if (mAsyncCallback == nullptr || reply.fmqByteCount == fmqByteCount) {
                        if (asyncCallback == nullptr || reply.fmqByteCount == fmqByteCount) {
                            mState = StreamDescriptor::State::ACTIVE;
                        } else {
                            switchToTransientState(StreamDescriptor::State::TRANSFERRING);
@@ -466,7 +470,8 @@ StreamOutWorkerLogic::Status StreamOutWorkerLogic::cycle() {
                    if (::android::status_t status = mDriver->drain(mode);
                        status == ::android::OK) {
                        populateReply(&reply, mIsConnected);
                        if (mState == StreamDescriptor::State::ACTIVE && mForceSynchronousDrain) {
                        if (mState == StreamDescriptor::State::ACTIVE &&
                            mContext->getForceSynchronousDrain()) {
                            mState = StreamDescriptor::State::IDLE;
                        } else {
                            switchToTransientState(StreamDescriptor::State::DRAINING);
@@ -541,7 +546,7 @@ StreamOutWorkerLogic::Status StreamOutWorkerLogic::cycle() {
    }
    reply.state = mState;
    LOG(severity) << __func__ << ": writing reply " << reply.toString();
    if (!mReplyMQ->writeBlocking(&reply, 1)) {
    if (!mContext->getReplyMQ()->writeBlocking(&reply, 1)) {
        LOG(ERROR) << __func__ << ": writing of reply " << reply.toString() << " to MQ failed";
        mState = StreamDescriptor::State::ERROR;
        return Status::ABORT;
@@ -550,38 +555,40 @@ StreamOutWorkerLogic::Status StreamOutWorkerLogic::cycle() {
}

bool StreamOutWorkerLogic::write(size_t clientSize, StreamDescriptor::Reply* reply) {
    const size_t readByteCount = mDataMQ->availableToRead();
    StreamContext::DataMQ* const dataMQ = mContext->getDataMQ();
    const size_t readByteCount = dataMQ->availableToRead();
    const size_t frameSize = mContext->getFrameSize();
    bool fatal = false;
    int32_t latency = Module::kLatencyMs;
    if (bool success = readByteCount > 0 ? mDataMQ->read(&mDataBuffer[0], readByteCount) : true) {
    if (bool success = readByteCount > 0 ? dataMQ->read(&mDataBuffer[0], readByteCount) : true) {
        const bool isConnected = mIsConnected;
        LOG(VERBOSE) << __func__ << ": reading of " << readByteCount << " bytes from data MQ"
                     << " succeeded; connected? " << isConnected;
        // Amount of data that the HAL module is going to actually use.
        size_t byteCount = std::min({clientSize, readByteCount, mDataBufferSize});
        if (byteCount >= mFrameSize && mForceTransientBurst) {
        if (byteCount >= frameSize && mContext->getForceTransientBurst()) {
            // In order to prevent the state machine from going to ACTIVE state,
            // simulate partial write.
            byteCount -= mFrameSize;
            byteCount -= frameSize;
        }
        size_t actualFrameCount = 0;
        if (isConnected) {
            if (::android::status_t status = mDriver->transfer(
                        mDataBuffer.get(), byteCount / mFrameSize, &actualFrameCount, &latency);
                        mDataBuffer.get(), byteCount / frameSize, &actualFrameCount, &latency);
                status != ::android::OK) {
                fatal = true;
                LOG(ERROR) << __func__ << ": write failed: " << status;
            }
        } else {
            if (mAsyncCallback == nullptr) {
            if (mContext->getAsyncCallback() == nullptr) {
                usleep(3000);  // Simulate blocking transfer delay.
            }
            actualFrameCount = byteCount / mFrameSize;
            actualFrameCount = byteCount / frameSize;
        }
        const size_t actualByteCount = actualFrameCount * mFrameSize;
        const size_t actualByteCount = actualFrameCount * frameSize;
        // Frames are consumed and counted regardless of the connection status.
        reply->fmqByteCount += actualByteCount;
        mFrameCount += actualFrameCount;
        mContext->advanceFrameCount(actualFrameCount);
        populateReply(reply, isConnected);
    } else {
        LOG(WARNING) << __func__ << ": reading of " << readByteCount
+1 −1
Original line number Diff line number Diff line
@@ -27,7 +27,7 @@

namespace aidl::android::hardware::audio::core {

StreamAlsa::StreamAlsa(const StreamContext& context, const Metadata& metadata, int readWriteRetries)
StreamAlsa::StreamAlsa(StreamContext* context, const Metadata& metadata, int readWriteRetries)
    : StreamCommonImpl(context, metadata),
      mFrameSizeBytes(getContext().getFrameSize()),
      mIsInput(isInput(metadata)),
+29 −33
Original line number Diff line number Diff line
@@ -113,7 +113,8 @@ class StreamContext {
          mDataMQ(std::move(other.mDataMQ)),
          mAsyncCallback(std::move(other.mAsyncCallback)),
          mOutEventCallback(std::move(other.mOutEventCallback)),
          mDebugParameters(std::move(other.mDebugParameters)) {}
          mDebugParameters(std::move(other.mDebugParameters)),
          mFrameCount(other.mFrameCount) {}
    StreamContext& operator=(StreamContext&& other) {
        mCommandMQ = std::move(other.mCommandMQ);
        mInternalCommandCookie = other.mInternalCommandCookie;
@@ -128,6 +129,7 @@ class StreamContext {
        mAsyncCallback = std::move(other.mAsyncCallback);
        mOutEventCallback = std::move(other.mOutEventCallback);
        mDebugParameters = std::move(other.mDebugParameters);
        mFrameCount = other.mFrameCount;
        return *this;
    }

@@ -156,7 +158,12 @@ class StreamContext {
    int getTransientStateDelayMs() const { return mDebugParameters.transientStateDelayMs; }
    int getSampleRate() const { return mSampleRate; }
    bool isValid() const;
    // 'reset' is called on a Binder thread when closing the stream. Does not use
    // locking because it only cleans MQ pointers which were also set on the Binder thread.
    void reset();
    // 'advanceFrameCount' and 'getFrameCount' are only called on the worker thread.
    long advanceFrameCount(size_t increase) { return mFrameCount += increase; }
    long getFrameCount() const { return mFrameCount; }

  private:
    std::unique_ptr<CommandMQ> mCommandMQ;
@@ -172,6 +179,7 @@ class StreamContext {
    std::shared_ptr<IStreamCallback> mAsyncCallback;
    std::shared_ptr<IStreamOutEventCallback> mOutEventCallback;  // Only used by output streams
    DebugParameters mDebugParameters;
    long mFrameCount = 0;
};

// This interface provides operations of the stream which are executed on the worker thread.
@@ -206,17 +214,10 @@ class StreamWorkerCommonLogic : public ::android::hardware::audio::common::Strea
  protected:
    using DataBufferElement = int8_t;

    StreamWorkerCommonLogic(const StreamContext& context, DriverInterface* driver)
        : mDriver(driver),
          mInternalCommandCookie(context.getInternalCommandCookie()),
          mFrameSize(context.getFrameSize()),
          mCommandMQ(context.getCommandMQ()),
          mReplyMQ(context.getReplyMQ()),
          mDataMQ(context.getDataMQ()),
          mAsyncCallback(context.getAsyncCallback()),
          mTransientStateDelayMs(context.getTransientStateDelayMs()),
          mForceTransientBurst(context.getForceTransientBurst()),
          mForceSynchronousDrain(context.getForceSynchronousDrain()) {}
    StreamWorkerCommonLogic(StreamContext* context, DriverInterface* driver)
        : mContext(context),
          mDriver(driver),
          mTransientStateDelayMs(context->getTransientStateDelayMs()) {}
    std::string init() override;
    void populateReply(StreamDescriptor::Reply* reply, bool isConnected) const;
    void populateReplyWrongState(StreamDescriptor::Reply* reply,
@@ -226,34 +227,28 @@ class StreamWorkerCommonLogic : public ::android::hardware::audio::common::Strea
        mTransientStateStart = std::chrono::steady_clock::now();
    }

    // The context is only used for reading, except for updating the frame count,
    // which happens on the worker thread only.
    StreamContext* const mContext;
    DriverInterface* const mDriver;
    // Atomic fields are used both by the main and worker threads.
    std::atomic<bool> mIsConnected = false;
    static_assert(std::atomic<StreamDescriptor::State>::is_always_lock_free);
    std::atomic<StreamDescriptor::State> mState = StreamDescriptor::State::STANDBY;
    // All fields are used on the worker thread only.
    const int mInternalCommandCookie;
    const size_t mFrameSize;
    StreamContext::CommandMQ* const mCommandMQ;
    StreamContext::ReplyMQ* const mReplyMQ;
    StreamContext::DataMQ* const mDataMQ;
    std::shared_ptr<IStreamCallback> mAsyncCallback;
    // All fields below are used on the worker thread only.
    const std::chrono::duration<int, std::milli> mTransientStateDelayMs;
    std::chrono::time_point<std::chrono::steady_clock> mTransientStateStart;
    const bool mForceTransientBurst;
    const bool mForceSynchronousDrain;
    // We use an array and the "size" field instead of a vector to be able to detect
    // memory allocation issues.
    std::unique_ptr<DataBufferElement[]> mDataBuffer;
    size_t mDataBufferSize;
    long mFrameCount = 0;
};

// This interface is used to decouple stream implementations from a concrete StreamWorker
// implementation.
struct StreamWorkerInterface {
    using CreateInstance = std::function<StreamWorkerInterface*(const StreamContext& context,
                                                                DriverInterface* driver)>;
    using CreateInstance =
            std::function<StreamWorkerInterface*(StreamContext* context, DriverInterface* driver)>;
    virtual ~StreamWorkerInterface() = default;
    virtual bool isClosed() const = 0;
    virtual void setIsConnected(bool isConnected) = 0;
@@ -268,7 +263,7 @@ class StreamWorkerImpl : public StreamWorkerInterface,
    using WorkerImpl = ::android::hardware::audio::common::StreamWorker<WorkerLogic>;

  public:
    StreamWorkerImpl(const StreamContext& context, DriverInterface* driver)
    StreamWorkerImpl(StreamContext* context, DriverInterface* driver)
        : WorkerImpl(context, driver) {}
    bool isClosed() const override { return WorkerImpl::isClosed(); }
    void setIsConnected(bool isConnected) override { WorkerImpl::setIsConnected(isConnected); }
@@ -282,7 +277,7 @@ class StreamWorkerImpl : public StreamWorkerInterface,
class StreamInWorkerLogic : public StreamWorkerCommonLogic {
  public:
    static const std::string kThreadName;
    StreamInWorkerLogic(const StreamContext& context, DriverInterface* driver)
    StreamInWorkerLogic(StreamContext* context, DriverInterface* driver)
        : StreamWorkerCommonLogic(context, driver) {}

  protected:
@@ -296,8 +291,9 @@ using StreamInWorker = StreamWorkerImpl<StreamInWorkerLogic>;
class StreamOutWorkerLogic : public StreamWorkerCommonLogic {
  public:
    static const std::string kThreadName;
    StreamOutWorkerLogic(const StreamContext& context, DriverInterface* driver)
        : StreamWorkerCommonLogic(context, driver), mEventCallback(context.getOutEventCallback()) {}
    StreamOutWorkerLogic(StreamContext* context, DriverInterface* driver)
        : StreamWorkerCommonLogic(context, driver),
          mEventCallback(context->getOutEventCallback()) {}

  protected:
    Status cycle() override;
@@ -416,10 +412,10 @@ class StreamCommonDelegator : public BnStreamCommon {
// who must be owner of the context.
class StreamCommonImpl : virtual public StreamCommonInterface, virtual public DriverInterface {
  public:
    StreamCommonImpl(const StreamContext& context, const Metadata& metadata,
    StreamCommonImpl(StreamContext* context, const Metadata& metadata,
                     const StreamWorkerInterface::CreateInstance& createWorker)
        : mContext(context), mMetadata(metadata), mWorker(createWorker(mContext, this)) {}
    StreamCommonImpl(const StreamContext& context, const Metadata& metadata)
        : mContext(*context), mMetadata(metadata), mWorker(createWorker(context, this)) {}
    StreamCommonImpl(StreamContext* context, const Metadata& metadata)
        : StreamCommonImpl(
                  context, metadata,
                  isInput(metadata) ? getDefaultInWorkerCreator() : getDefaultOutWorkerCreator()) {}
@@ -453,12 +449,12 @@ class StreamCommonImpl : virtual public StreamCommonInterface, virtual public Dr

  protected:
    static StreamWorkerInterface::CreateInstance getDefaultInWorkerCreator() {
        return [](const StreamContext& ctx, DriverInterface* driver) -> StreamWorkerInterface* {
        return [](StreamContext* ctx, DriverInterface* driver) -> StreamWorkerInterface* {
            return new StreamInWorker(ctx, driver);
        };
    }
    static StreamWorkerInterface::CreateInstance getDefaultOutWorkerCreator() {
        return [](const StreamContext& ctx, DriverInterface* driver) -> StreamWorkerInterface* {
        return [](StreamContext* ctx, DriverInterface* driver) -> StreamWorkerInterface* {
            return new StreamOutWorker(ctx, driver);
        };
    }
+1 −1
Original line number Diff line number Diff line
@@ -31,7 +31,7 @@ namespace aidl::android::hardware::audio::core {
// provide necessary overrides for all interface methods omitted here.
class StreamAlsa : public StreamCommonImpl {
  public:
    StreamAlsa(const StreamContext& context, const Metadata& metadata, int readWriteRetries);
    StreamAlsa(StreamContext* context, const Metadata& metadata, int readWriteRetries);
    // Methods of 'DriverInterface'.
    ::android::status_t init() override;
    ::android::status_t drain(StreamDescriptor::DrainMode) override;
+1 −1
Original line number Diff line number Diff line
@@ -29,7 +29,7 @@ using aidl::android::hardware::audio::core::r_submix::SubmixRoute;

class StreamRemoteSubmix : public StreamCommonImpl {
  public:
    StreamRemoteSubmix(const StreamContext& context, const Metadata& metadata);
    StreamRemoteSubmix(StreamContext* context, const Metadata& metadata);

    ::android::status_t init() override;
    ::android::status_t drain(StreamDescriptor::DrainMode) override;
Loading