Donate to e Foundation | Murena handsets with /e/OS | Own a part of Murena! Learn more

Commit c8381909 authored by Mikhail Naganov's avatar Mikhail Naganov
Browse files

audiohal: Make sure audio data transfer related commands go via FMQ

When outputting audio, the framework issues several HAL calls
from the same thread that writes into data FMQ. These calls
also need to be served on the same thread that writes audio data
to HAL. The same thing happens when audio input is commenced.

Add a command FMQ for passing different commands to the HAL thread.
This way, depending on the calling thread, the same call may go
either via hwbinder or via the command queue.

This dramatically reduces jitter in RTT measurements (although
doesn't improve the latency).

Bug: 30222631
Test: scripted RTT app
Change-Id: Ic212e33d4e2b1cf404973fe9d60762523eef9f40
parent 67999b02
Loading
Loading
Loading
Loading
+147 −87
Original line number Diff line number Diff line
@@ -14,8 +14,6 @@
 * limitations under the License.
 */

#include <time.h>

#define LOG_TAG "StreamHalHidl"
//#define LOG_NDEBUG 0

@@ -40,6 +38,7 @@ using ::android::hardware::audio::V2_0::TimeSpec;
using ::android::hardware::MQDescriptorSync;
using ::android::hardware::Return;
using ::android::hardware::Void;
using ReadCommand = ::android::hardware::audio::V2_0::IStreamIn::ReadCommand;

namespace android {

@@ -241,8 +240,7 @@ struct StreamOutCallback : public IStreamOutCallback {
}  // namespace

StreamOutHalHidl::StreamOutHalHidl(const sp<IStreamOut>& stream)
        : StreamHalHidl(stream.get()), mStream(stream), mEfGroup(nullptr),
          mGetPresentationPositionNotSupported(false), mPPosFromWrite{ 0, OK, 0, { 0, 0 } } {
        : StreamHalHidl(stream.get()), mStream(stream), mWriterClient(0), mEfGroup(nullptr) {
}

StreamOutHalHidl::~StreamOutHalHidl() {
@@ -265,8 +263,16 @@ status_t StreamOutHalHidl::getFrameSize(size_t *size) {

status_t StreamOutHalHidl::getLatency(uint32_t *latency) {
    if (mStream == 0) return NO_INIT;
    if (mWriterClient == gettid() && mCommandMQ) {
        return callWriterThread(
                WriteCommand::GET_LATENCY, "getLatency", nullptr, 0,
                [&](const WriteStatus& writeStatus) {
                    *latency = writeStatus.reply.latencyMs;
                });
    } else {
        return processReturn("getLatency", mStream->getLatency(), latency);
    }
}

status_t StreamOutHalHidl::setVolume(float left, float right) {
    if (mStream == 0) return NO_INIT;
@@ -288,10 +294,30 @@ status_t StreamOutHalHidl::write(const void *buffer, size_t bytes, size_t *writt
        return status;
    }

    const size_t availBytes = mDataMQ->availableToWrite();
    if (bytes > availBytes) { bytes = availBytes; }
    if (!mDataMQ->write(static_cast<const uint8_t*>(buffer), bytes)) {
        ALOGW("data message queue write failed");
    return callWriterThread(
            WriteCommand::WRITE, "write", static_cast<const uint8_t*>(buffer), bytes,
            [&] (const WriteStatus& writeStatus) {
                *written = writeStatus.reply.written;
            });
}

status_t StreamOutHalHidl::callWriterThread(
        WriteCommand cmd, const char* cmdName,
        const uint8_t* data, size_t dataSize, StreamOutHalHidl::WriterCallback callback) {
    if (!mCommandMQ->write(&cmd)) {
        ALOGE("command message queue write failed for \"%s\"", cmdName);
        return -EAGAIN;
    }
    if (data != nullptr) {
        size_t availableToWrite = mDataMQ->availableToWrite();
        if (dataSize > availableToWrite) {
            ALOGW("truncating write data from %d to %d due to insufficient data queue space",
                    (int32_t)dataSize, (int32_t)availableToWrite);
            dataSize = availableToWrite;
        }
        if (!mDataMQ->write(data, dataSize)) {
            ALOGE("data message queue write failed for \"%s\"", cmdName);
        }
    }
    mEfGroup->wake(static_cast<uint32_t>(MessageQueueFlagBits::NOT_EMPTY));

@@ -301,24 +327,18 @@ retry:
    status_t ret = mEfGroup->wait(
            static_cast<uint32_t>(MessageQueueFlagBits::NOT_FULL), &efState, NS_PER_SEC);
    if (efState & static_cast<uint32_t>(MessageQueueFlagBits::NOT_FULL)) {
        WriteStatus writeStatus =
                { Result::NOT_INITIALIZED, 0, Result::NOT_INITIALIZED, 0, { 0, 0 } };
        mStatusMQ->read(&writeStatus);
        if (writeStatus.writeRetval == Result::OK) {
            status = OK;
            *written = writeStatus.written;
            mPPosFromWrite.status = processReturn(
                    "get_presentation_position", writeStatus.presentationPositionRetval);
            if (mPPosFromWrite.status == OK) {
                mPPosFromWrite.frames = writeStatus.frames;
                mPPosFromWrite.ts.tv_sec = writeStatus.timeStamp.tvSec;
                mPPosFromWrite.ts.tv_nsec = writeStatus.timeStamp.tvNSec;
            }
            mPPosFromWrite.obtained = getCurrentTimeMs();
        WriteStatus writeStatus;
        writeStatus.retval = Result::NOT_INITIALIZED;
        if (!mStatusMQ->read(&writeStatus)) {
            ALOGE("status message read failed for \"%s\"", cmdName);
        }
        if (writeStatus.retval == Result::OK) {
            ret = OK;
            callback(writeStatus);
        } else {
            status = processReturn("write", writeStatus.writeRetval);
            ret = processReturn(cmdName, writeStatus.retval);
        }
        return status;
        return ret;
    }
    if (ret == -EAGAIN) {
        // This normally retries no more than once.
@@ -327,23 +347,20 @@ retry:
    return ret;
}

uint64_t StreamOutHalHidl::getCurrentTimeMs() {
    struct timespec timeNow;
    clock_gettime(CLOCK_MONOTONIC, &timeNow);
    return timeNow.tv_sec * 1000000 + timeNow.tv_nsec / 1000;
}

status_t StreamOutHalHidl::prepareForWriting(size_t bufferSize) {
    std::unique_ptr<CommandMQ> tempCommandMQ;
    std::unique_ptr<DataMQ> tempDataMQ;
    std::unique_ptr<StatusMQ> tempStatusMQ;
    Result retval;
    Return<void> ret = mStream->prepareForWriting(
            1, bufferSize, ThreadPriority(mHalThreadPriority),
            [&](Result r,
                    const CommandMQ::Descriptor& commandMQ,
                    const DataMQ::Descriptor& dataMQ,
                    const StatusMQ::Descriptor& statusMQ) {
                retval = r;
                if (retval == Result::OK) {
                    tempCommandMQ.reset(new CommandMQ(commandMQ));
                    tempDataMQ.reset(new DataMQ(dataMQ));
                    tempStatusMQ.reset(new StatusMQ(statusMQ));
                    if (tempDataMQ->isValid() && tempDataMQ->getEventFlagWord()) {
@@ -354,8 +371,13 @@ status_t StreamOutHalHidl::prepareForWriting(size_t bufferSize) {
    if (!ret.isOk() || retval != Result::OK) {
        return processReturn("prepareForWriting", ret, retval);
    }
    if (!tempDataMQ || !tempDataMQ->isValid() || !tempStatusMQ || !tempStatusMQ->isValid()
        || !mEfGroup) {
    if (!tempCommandMQ || !tempCommandMQ->isValid() ||
            !tempDataMQ || !tempDataMQ->isValid() ||
            !tempStatusMQ || !tempStatusMQ->isValid() ||
            !mEfGroup) {
        ALOGE_IF(!tempCommandMQ, "Failed to obtain command message queue for writing");
        ALOGE_IF(tempCommandMQ && !tempCommandMQ->isValid(),
                "Command message queue for writing is invalid");
        ALOGE_IF(!tempDataMQ, "Failed to obtain data message queue for writing");
        ALOGE_IF(tempDataMQ && !tempDataMQ->isValid(), "Data message queue for writing is invalid");
        ALOGE_IF(!tempStatusMQ, "Failed to obtain status message queue for writing");
@@ -364,8 +386,10 @@ status_t StreamOutHalHidl::prepareForWriting(size_t bufferSize) {
        ALOGE_IF(!mEfGroup, "Event flag creation for writing failed");
        return NO_INIT;
    }
    mCommandMQ = std::move(tempCommandMQ);
    mDataMQ = std::move(tempDataMQ);
    mStatusMQ = std::move(tempStatusMQ);
    mWriterClient = gettid();
    return OK;
}

@@ -443,17 +467,15 @@ status_t StreamOutHalHidl::flush() {

status_t StreamOutHalHidl::getPresentationPosition(uint64_t *frames, struct timespec *timestamp) {
    if (mStream == 0) return NO_INIT;
    if (mGetPresentationPositionNotSupported) return INVALID_OPERATION;
    if (getCurrentTimeMs() - mPPosFromWrite.obtained <= 1000) {
        // No more than 1 ms passed since the last write, use cached result to avoid binder calls.
        if (mPPosFromWrite.status == OK) {
            *frames = mPPosFromWrite.frames;
            timestamp->tv_sec = mPPosFromWrite.ts.tv_sec;
            timestamp->tv_nsec = mPPosFromWrite.ts.tv_nsec;
        }
        return mPPosFromWrite.status;
    }

    if (mWriterClient == gettid() && mCommandMQ) {
        return callWriterThread(
                WriteCommand::GET_PRESENTATION_POSITION, "getPresentationPosition", nullptr, 0,
                [&](const WriteStatus& writeStatus) {
                    *frames = writeStatus.reply.presentationPosition.frames;
                    timestamp->tv_sec = writeStatus.reply.presentationPosition.timeStamp.tvSec;
                    timestamp->tv_nsec = writeStatus.reply.presentationPosition.timeStamp.tvNSec;
                });
    } else {
        Result retval;
        Return<void> ret = mStream->getPresentationPosition(
                [&](Result r, uint64_t hidlFrames, const TimeSpec& hidlTimeStamp) {
@@ -464,11 +486,9 @@ status_t StreamOutHalHidl::getPresentationPosition(uint64_t *frames, struct time
                        timestamp->tv_nsec = hidlTimeStamp.tvNSec;
                    }
                });
    if (ret.isOk() && retval == Result::NOT_SUPPORTED) {
        mGetPresentationPositionNotSupported = true;
    }
        return processReturn("getPresentationPosition", ret, retval);
    }
}

void StreamOutHalHidl::onWriteReady() {
    sp<StreamOutHalInterfaceCallback> callback = mCallback.promote();
@@ -493,7 +513,7 @@ void StreamOutHalHidl::onError() {


StreamInHalHidl::StreamInHalHidl(const sp<IStreamIn>& stream)
        : StreamHalHidl(stream.get()), mStream(stream), mEfGroup(nullptr) {
        : StreamHalHidl(stream.get()), mStream(stream), mReaderClient(0), mEfGroup(nullptr) {
}

StreamInHalHidl::~StreamInHalHidl() {
@@ -525,11 +545,34 @@ status_t StreamInHalHidl::read(void *buffer, size_t bytes, size_t *read) {
    }

    status_t status;
    if (!mDataMQ) {
        if ((status = prepareForReading(bytes)) != OK) return status;
        // Trigger the first read.
        mEfGroup->wake(static_cast<uint32_t>(MessageQueueFlagBits::NOT_FULL));
    if (!mDataMQ && (status = prepareForReading(bytes)) != OK) {
        return status;
    }

    ReadParameters params;
    params.command = ReadCommand::READ;
    params.params.read = bytes;
    return callReaderThread(params, "read",
            [&](const ReadStatus& readStatus) {
                const size_t availToRead = mDataMQ->availableToRead();
                if (!mDataMQ->read(static_cast<uint8_t*>(buffer), std::min(bytes, availToRead))) {
                    ALOGE("data message queue read failed for \"read\"");
                }
                ALOGW_IF(availToRead != readStatus.reply.read,
                        "HAL read report inconsistent: mq = %d, status = %d",
                        (int32_t)availToRead, (int32_t)readStatus.reply.read);
                *read = readStatus.reply.read;
            });
}

status_t StreamInHalHidl::callReaderThread(
        const ReadParameters& params, const char* cmdName,
        StreamInHalHidl::ReaderCallback callback) {
    if (!mCommandMQ->write(&params)) {
        ALOGW("command message queue write failed");
        return -EAGAIN;
    }
    mEfGroup->wake(static_cast<uint32_t>(MessageQueueFlagBits::NOT_FULL));

    // TODO: Remove manual event flag handling once blocking MQ is implemented. b/33815422
    uint32_t efState = 0;
@@ -537,21 +580,18 @@ retry:
    status_t ret = mEfGroup->wait(
            static_cast<uint32_t>(MessageQueueFlagBits::NOT_EMPTY), &efState, NS_PER_SEC);
    if (efState & static_cast<uint32_t>(MessageQueueFlagBits::NOT_EMPTY)) {
        ReadStatus readStatus = { Result::NOT_INITIALIZED, 0 };
        const size_t availToRead = mDataMQ->availableToRead();
        if (bytes > availToRead) { bytes = availToRead; }
        mDataMQ->read(static_cast<uint8_t*>(buffer), bytes);
        mStatusMQ->read(&readStatus);
        mEfGroup->wake(static_cast<uint32_t>(MessageQueueFlagBits::NOT_FULL));
        ReadStatus readStatus;
        readStatus.retval = Result::NOT_INITIALIZED;
        if (!mStatusMQ->read(&readStatus)) {
            ALOGE("status message read failed for \"%s\"", cmdName);
        }
         if (readStatus.retval == Result::OK) {
            ALOGW_IF(availToRead != readStatus.read,
                    "HAL read report inconsistent: mq = %d, status = %d",
                    (int32_t)availToRead, (int32_t)readStatus.read);
            *read = readStatus.read;
            ret = OK;
            callback(readStatus);
        } else {
            status = processReturn("read", readStatus.retval);
            ret = processReturn(cmdName, readStatus.retval);
        }
        return status;
        return ret;
    }
    if (ret == -EAGAIN) {
        // This normally retries no more than once.
@@ -561,16 +601,19 @@ retry:
}

status_t StreamInHalHidl::prepareForReading(size_t bufferSize) {
    std::unique_ptr<CommandMQ> tempCommandMQ;
    std::unique_ptr<DataMQ> tempDataMQ;
    std::unique_ptr<StatusMQ> tempStatusMQ;
    Result retval;
    Return<void> ret = mStream->prepareForReading(
            1, bufferSize, ThreadPriority(mHalThreadPriority),
            [&](Result r,
                    const CommandMQ::Descriptor& commandMQ,
                    const DataMQ::Descriptor& dataMQ,
                    const StatusMQ::Descriptor& statusMQ) {
                retval = r;
                if (retval == Result::OK) {
                    tempCommandMQ.reset(new CommandMQ(commandMQ));
                    tempDataMQ.reset(new DataMQ(dataMQ));
                    tempStatusMQ.reset(new StatusMQ(statusMQ));
                    if (tempDataMQ->isValid() && tempDataMQ->getEventFlagWord()) {
@@ -581,8 +624,13 @@ status_t StreamInHalHidl::prepareForReading(size_t bufferSize) {
    if (!ret.isOk() || retval != Result::OK) {
        return processReturn("prepareForReading", ret, retval);
    }
    if (!tempDataMQ || !tempDataMQ->isValid() || !tempStatusMQ || !tempStatusMQ->isValid()
        || !mEfGroup) {
    if (!tempCommandMQ || !tempCommandMQ->isValid() ||
            !tempDataMQ || !tempDataMQ->isValid() ||
            !tempStatusMQ || !tempStatusMQ->isValid() ||
            !mEfGroup) {
        ALOGE_IF(!tempCommandMQ, "Failed to obtain command message queue for writing");
        ALOGE_IF(tempCommandMQ && !tempCommandMQ->isValid(),
                "Command message queue for writing is invalid");
        ALOGE_IF(!tempDataMQ, "Failed to obtain data message queue for reading");
        ALOGE_IF(tempDataMQ && !tempDataMQ->isValid(), "Data message queue for reading is invalid");
        ALOGE_IF(!tempStatusMQ, "Failed to obtain status message queue for reading");
@@ -591,8 +639,10 @@ status_t StreamInHalHidl::prepareForReading(size_t bufferSize) {
        ALOGE_IF(!mEfGroup, "Event flag creation for reading failed");
        return NO_INIT;
    }
    mCommandMQ = std::move(tempCommandMQ);
    mDataMQ = std::move(tempDataMQ);
    mStatusMQ = std::move(tempStatusMQ);
    mReaderClient = gettid();
    return OK;
}

@@ -603,6 +653,15 @@ status_t StreamInHalHidl::getInputFramesLost(uint32_t *framesLost) {

status_t StreamInHalHidl::getCapturePosition(int64_t *frames, int64_t *time) {
    if (mStream == 0) return NO_INIT;
    if (mReaderClient == gettid() && mCommandMQ) {
        ReadParameters params;
        params.command = ReadCommand::GET_CAPTURE_POSITION;
        return callReaderThread(params, "getCapturePosition",
                [&](const ReadStatus& readStatus) {
                    *frames = readStatus.reply.capturePosition.frames;
                    *time = readStatus.reply.capturePosition.time;
                });
    } else {
        Result retval;
        Return<void> ret = mStream->getCapturePosition(
                [&](Result r, uint64_t hidlFrames, uint64_t hidlTime) {
@@ -614,5 +673,6 @@ status_t StreamInHalHidl::getCapturePosition(int64_t *frames, int64_t *time) {
                });
        return processReturn("getCapturePosition", ret, retval);
    }
}

} // namespace android
+17 −8
Original line number Diff line number Diff line
@@ -17,6 +17,8 @@
#ifndef ANDROID_HARDWARE_STREAM_HAL_HIDL_H
#define ANDROID_HARDWARE_STREAM_HAL_HIDL_H

#include <atomic>

#include <android/hardware/audio/2.0/IStream.h>
#include <android/hardware/audio/2.0/IStreamIn.h>
#include <android/hardware/audio/2.0/IStreamOut.h>
@@ -32,7 +34,9 @@ using ::android::hardware::audio::V2_0::IStreamOut;
using ::android::hardware::EventFlag;
using ::android::hardware::MessageQueue;
using ::android::hardware::Return;
using ReadParameters = ::android::hardware::audio::V2_0::IStreamIn::ReadParameters;
using ReadStatus = ::android::hardware::audio::V2_0::IStreamIn::ReadStatus;
using WriteCommand = ::android::hardware::audio::V2_0::IStreamOut::WriteCommand;
using WriteStatus = ::android::hardware::audio::V2_0::IStreamOut::WriteStatus;

namespace android {
@@ -155,28 +159,27 @@ class StreamOutHalHidl : public StreamOutHalInterface, public StreamHalHidl {

  private:
    friend class DeviceHalHidl;
    typedef MessageQueue<WriteCommand, hardware::kSynchronizedReadWrite> CommandMQ;
    typedef MessageQueue<uint8_t, hardware::kSynchronizedReadWrite> DataMQ;
    typedef MessageQueue<WriteStatus, hardware::kSynchronizedReadWrite> StatusMQ;

    wp<StreamOutHalInterfaceCallback> mCallback;
    sp<IStreamOut> mStream;
    std::unique_ptr<CommandMQ> mCommandMQ;
    std::unique_ptr<DataMQ> mDataMQ;
    std::unique_ptr<StatusMQ> mStatusMQ;
    std::atomic<pid_t> mWriterClient;
    EventFlag* mEfGroup;
    bool mGetPresentationPositionNotSupported;
    struct {
        uint64_t obtained;
        status_t status;
        uint64_t frames;
        struct timespec ts;
    } mPPosFromWrite;

    // Can not be constructed directly by clients.
    StreamOutHalHidl(const sp<IStreamOut>& stream);

    virtual ~StreamOutHalHidl();

    uint64_t getCurrentTimeMs();
    using WriterCallback = std::function<void(const WriteStatus& writeStatus)>;
    status_t callWriterThread(
            WriteCommand cmd, const char* cmdName,
            const uint8_t* data, size_t dataSize, WriterCallback callback);
    status_t prepareForWriting(size_t bufferSize);
};

@@ -200,12 +203,15 @@ class StreamInHalHidl : public StreamInHalInterface, public StreamHalHidl {

  private:
    friend class DeviceHalHidl;
    typedef MessageQueue<ReadParameters, hardware::kSynchronizedReadWrite> CommandMQ;
    typedef MessageQueue<uint8_t, hardware::kSynchronizedReadWrite> DataMQ;
    typedef MessageQueue<ReadStatus, hardware::kSynchronizedReadWrite> StatusMQ;

    sp<IStreamIn> mStream;
    std::unique_ptr<CommandMQ> mCommandMQ;
    std::unique_ptr<DataMQ> mDataMQ;
    std::unique_ptr<StatusMQ> mStatusMQ;
    std::atomic<pid_t> mReaderClient;
    EventFlag* mEfGroup;

    // Can not be constructed directly by clients.
@@ -213,6 +219,9 @@ class StreamInHalHidl : public StreamInHalInterface, public StreamHalHidl {

    virtual ~StreamInHalHidl();

    using ReaderCallback = std::function<void(const ReadStatus& readStatus)>;
    status_t callReaderThread(
            const ReadParameters& params, const char* cmdName, ReaderCallback callback);
    status_t prepareForReading(size_t bufferSize);
};