Donate to e Foundation | Murena handsets with /e/OS | Own a part of Murena! Learn more

Commit 3241dddc authored by Mikhail Naganov's avatar Mikhail Naganov Committed by Android (Google) Code Review
Browse files

Merge "audiohal: Make sure audio data transfer related commands go via FMQ"

parents e1c8daf1 c8381909
Loading
Loading
Loading
Loading
+147 −87
Original line number Diff line number Diff line
@@ -14,8 +14,6 @@
 * limitations under the License.
 */

#include <time.h>

#define LOG_TAG "StreamHalHidl"
//#define LOG_NDEBUG 0

@@ -40,6 +38,7 @@ using ::android::hardware::audio::V2_0::TimeSpec;
using ::android::hardware::MQDescriptorSync;
using ::android::hardware::Return;
using ::android::hardware::Void;
using ReadCommand = ::android::hardware::audio::V2_0::IStreamIn::ReadCommand;

namespace android {

@@ -241,8 +240,7 @@ struct StreamOutCallback : public IStreamOutCallback {
}  // namespace

StreamOutHalHidl::StreamOutHalHidl(const sp<IStreamOut>& stream)
        : StreamHalHidl(stream.get()), mStream(stream), mEfGroup(nullptr),
          mGetPresentationPositionNotSupported(false), mPPosFromWrite{ 0, OK, 0, { 0, 0 } } {
        : StreamHalHidl(stream.get()), mStream(stream), mWriterClient(0), mEfGroup(nullptr) {
}

StreamOutHalHidl::~StreamOutHalHidl() {
@@ -265,8 +263,16 @@ status_t StreamOutHalHidl::getFrameSize(size_t *size) {

status_t StreamOutHalHidl::getLatency(uint32_t *latency) {
    if (mStream == 0) return NO_INIT;
    if (mWriterClient == gettid() && mCommandMQ) {
        return callWriterThread(
                WriteCommand::GET_LATENCY, "getLatency", nullptr, 0,
                [&](const WriteStatus& writeStatus) {
                    *latency = writeStatus.reply.latencyMs;
                });
    } else {
        return processReturn("getLatency", mStream->getLatency(), latency);
    }
}

status_t StreamOutHalHidl::setVolume(float left, float right) {
    if (mStream == 0) return NO_INIT;
@@ -288,10 +294,30 @@ status_t StreamOutHalHidl::write(const void *buffer, size_t bytes, size_t *writt
        return status;
    }

    const size_t availBytes = mDataMQ->availableToWrite();
    if (bytes > availBytes) { bytes = availBytes; }
    if (!mDataMQ->write(static_cast<const uint8_t*>(buffer), bytes)) {
        ALOGW("data message queue write failed");
    return callWriterThread(
            WriteCommand::WRITE, "write", static_cast<const uint8_t*>(buffer), bytes,
            [&] (const WriteStatus& writeStatus) {
                *written = writeStatus.reply.written;
            });
}

status_t StreamOutHalHidl::callWriterThread(
        WriteCommand cmd, const char* cmdName,
        const uint8_t* data, size_t dataSize, StreamOutHalHidl::WriterCallback callback) {
    if (!mCommandMQ->write(&cmd)) {
        ALOGE("command message queue write failed for \"%s\"", cmdName);
        return -EAGAIN;
    }
    if (data != nullptr) {
        size_t availableToWrite = mDataMQ->availableToWrite();
        if (dataSize > availableToWrite) {
            ALOGW("truncating write data from %d to %d due to insufficient data queue space",
                    (int32_t)dataSize, (int32_t)availableToWrite);
            dataSize = availableToWrite;
        }
        if (!mDataMQ->write(data, dataSize)) {
            ALOGE("data message queue write failed for \"%s\"", cmdName);
        }
    }
    mEfGroup->wake(static_cast<uint32_t>(MessageQueueFlagBits::NOT_EMPTY));

@@ -301,24 +327,18 @@ retry:
    status_t ret = mEfGroup->wait(
            static_cast<uint32_t>(MessageQueueFlagBits::NOT_FULL), &efState, NS_PER_SEC);
    if (efState & static_cast<uint32_t>(MessageQueueFlagBits::NOT_FULL)) {
        WriteStatus writeStatus =
                { Result::NOT_INITIALIZED, 0, Result::NOT_INITIALIZED, 0, { 0, 0 } };
        mStatusMQ->read(&writeStatus);
        if (writeStatus.writeRetval == Result::OK) {
            status = OK;
            *written = writeStatus.written;
            mPPosFromWrite.status = processReturn(
                    "get_presentation_position", writeStatus.presentationPositionRetval);
            if (mPPosFromWrite.status == OK) {
                mPPosFromWrite.frames = writeStatus.frames;
                mPPosFromWrite.ts.tv_sec = writeStatus.timeStamp.tvSec;
                mPPosFromWrite.ts.tv_nsec = writeStatus.timeStamp.tvNSec;
            }
            mPPosFromWrite.obtained = getCurrentTimeMs();
        WriteStatus writeStatus;
        writeStatus.retval = Result::NOT_INITIALIZED;
        if (!mStatusMQ->read(&writeStatus)) {
            ALOGE("status message read failed for \"%s\"", cmdName);
        }
        if (writeStatus.retval == Result::OK) {
            ret = OK;
            callback(writeStatus);
        } else {
            status = processReturn("write", writeStatus.writeRetval);
            ret = processReturn(cmdName, writeStatus.retval);
        }
        return status;
        return ret;
    }
    if (ret == -EAGAIN) {
        // This normally retries no more than once.
@@ -327,23 +347,20 @@ retry:
    return ret;
}

uint64_t StreamOutHalHidl::getCurrentTimeMs() {
    struct timespec timeNow;
    clock_gettime(CLOCK_MONOTONIC, &timeNow);
    return timeNow.tv_sec * 1000000 + timeNow.tv_nsec / 1000;
}

status_t StreamOutHalHidl::prepareForWriting(size_t bufferSize) {
    std::unique_ptr<CommandMQ> tempCommandMQ;
    std::unique_ptr<DataMQ> tempDataMQ;
    std::unique_ptr<StatusMQ> tempStatusMQ;
    Result retval;
    Return<void> ret = mStream->prepareForWriting(
            1, bufferSize, ThreadPriority(mHalThreadPriority),
            [&](Result r,
                    const CommandMQ::Descriptor& commandMQ,
                    const DataMQ::Descriptor& dataMQ,
                    const StatusMQ::Descriptor& statusMQ) {
                retval = r;
                if (retval == Result::OK) {
                    tempCommandMQ.reset(new CommandMQ(commandMQ));
                    tempDataMQ.reset(new DataMQ(dataMQ));
                    tempStatusMQ.reset(new StatusMQ(statusMQ));
                    if (tempDataMQ->isValid() && tempDataMQ->getEventFlagWord()) {
@@ -354,8 +371,13 @@ status_t StreamOutHalHidl::prepareForWriting(size_t bufferSize) {
    if (!ret.isOk() || retval != Result::OK) {
        return processReturn("prepareForWriting", ret, retval);
    }
    if (!tempDataMQ || !tempDataMQ->isValid() || !tempStatusMQ || !tempStatusMQ->isValid()
        || !mEfGroup) {
    if (!tempCommandMQ || !tempCommandMQ->isValid() ||
            !tempDataMQ || !tempDataMQ->isValid() ||
            !tempStatusMQ || !tempStatusMQ->isValid() ||
            !mEfGroup) {
        ALOGE_IF(!tempCommandMQ, "Failed to obtain command message queue for writing");
        ALOGE_IF(tempCommandMQ && !tempCommandMQ->isValid(),
                "Command message queue for writing is invalid");
        ALOGE_IF(!tempDataMQ, "Failed to obtain data message queue for writing");
        ALOGE_IF(tempDataMQ && !tempDataMQ->isValid(), "Data message queue for writing is invalid");
        ALOGE_IF(!tempStatusMQ, "Failed to obtain status message queue for writing");
@@ -364,8 +386,10 @@ status_t StreamOutHalHidl::prepareForWriting(size_t bufferSize) {
        ALOGE_IF(!mEfGroup, "Event flag creation for writing failed");
        return NO_INIT;
    }
    mCommandMQ = std::move(tempCommandMQ);
    mDataMQ = std::move(tempDataMQ);
    mStatusMQ = std::move(tempStatusMQ);
    mWriterClient = gettid();
    return OK;
}

@@ -443,17 +467,15 @@ status_t StreamOutHalHidl::flush() {

status_t StreamOutHalHidl::getPresentationPosition(uint64_t *frames, struct timespec *timestamp) {
    if (mStream == 0) return NO_INIT;
    if (mGetPresentationPositionNotSupported) return INVALID_OPERATION;
    if (getCurrentTimeMs() - mPPosFromWrite.obtained <= 1000) {
        // No more than 1 ms passed since the last write, use cached result to avoid binder calls.
        if (mPPosFromWrite.status == OK) {
            *frames = mPPosFromWrite.frames;
            timestamp->tv_sec = mPPosFromWrite.ts.tv_sec;
            timestamp->tv_nsec = mPPosFromWrite.ts.tv_nsec;
        }
        return mPPosFromWrite.status;
    }

    if (mWriterClient == gettid() && mCommandMQ) {
        return callWriterThread(
                WriteCommand::GET_PRESENTATION_POSITION, "getPresentationPosition", nullptr, 0,
                [&](const WriteStatus& writeStatus) {
                    *frames = writeStatus.reply.presentationPosition.frames;
                    timestamp->tv_sec = writeStatus.reply.presentationPosition.timeStamp.tvSec;
                    timestamp->tv_nsec = writeStatus.reply.presentationPosition.timeStamp.tvNSec;
                });
    } else {
        Result retval;
        Return<void> ret = mStream->getPresentationPosition(
                [&](Result r, uint64_t hidlFrames, const TimeSpec& hidlTimeStamp) {
@@ -464,11 +486,9 @@ status_t StreamOutHalHidl::getPresentationPosition(uint64_t *frames, struct time
                        timestamp->tv_nsec = hidlTimeStamp.tvNSec;
                    }
                });
    if (ret.isOk() && retval == Result::NOT_SUPPORTED) {
        mGetPresentationPositionNotSupported = true;
    }
        return processReturn("getPresentationPosition", ret, retval);
    }
}

void StreamOutHalHidl::onWriteReady() {
    sp<StreamOutHalInterfaceCallback> callback = mCallback.promote();
@@ -493,7 +513,7 @@ void StreamOutHalHidl::onError() {


StreamInHalHidl::StreamInHalHidl(const sp<IStreamIn>& stream)
        : StreamHalHidl(stream.get()), mStream(stream), mEfGroup(nullptr) {
        : StreamHalHidl(stream.get()), mStream(stream), mReaderClient(0), mEfGroup(nullptr) {
}

StreamInHalHidl::~StreamInHalHidl() {
@@ -525,11 +545,34 @@ status_t StreamInHalHidl::read(void *buffer, size_t bytes, size_t *read) {
    }

    status_t status;
    if (!mDataMQ) {
        if ((status = prepareForReading(bytes)) != OK) return status;
        // Trigger the first read.
        mEfGroup->wake(static_cast<uint32_t>(MessageQueueFlagBits::NOT_FULL));
    if (!mDataMQ && (status = prepareForReading(bytes)) != OK) {
        return status;
    }

    ReadParameters params;
    params.command = ReadCommand::READ;
    params.params.read = bytes;
    return callReaderThread(params, "read",
            [&](const ReadStatus& readStatus) {
                const size_t availToRead = mDataMQ->availableToRead();
                if (!mDataMQ->read(static_cast<uint8_t*>(buffer), std::min(bytes, availToRead))) {
                    ALOGE("data message queue read failed for \"read\"");
                }
                ALOGW_IF(availToRead != readStatus.reply.read,
                        "HAL read report inconsistent: mq = %d, status = %d",
                        (int32_t)availToRead, (int32_t)readStatus.reply.read);
                *read = readStatus.reply.read;
            });
}

status_t StreamInHalHidl::callReaderThread(
        const ReadParameters& params, const char* cmdName,
        StreamInHalHidl::ReaderCallback callback) {
    if (!mCommandMQ->write(&params)) {
        ALOGW("command message queue write failed");
        return -EAGAIN;
    }
    mEfGroup->wake(static_cast<uint32_t>(MessageQueueFlagBits::NOT_FULL));

    // TODO: Remove manual event flag handling once blocking MQ is implemented. b/33815422
    uint32_t efState = 0;
@@ -537,21 +580,18 @@ retry:
    status_t ret = mEfGroup->wait(
            static_cast<uint32_t>(MessageQueueFlagBits::NOT_EMPTY), &efState, NS_PER_SEC);
    if (efState & static_cast<uint32_t>(MessageQueueFlagBits::NOT_EMPTY)) {
        ReadStatus readStatus = { Result::NOT_INITIALIZED, 0 };
        const size_t availToRead = mDataMQ->availableToRead();
        if (bytes > availToRead) { bytes = availToRead; }
        mDataMQ->read(static_cast<uint8_t*>(buffer), bytes);
        mStatusMQ->read(&readStatus);
        mEfGroup->wake(static_cast<uint32_t>(MessageQueueFlagBits::NOT_FULL));
        ReadStatus readStatus;
        readStatus.retval = Result::NOT_INITIALIZED;
        if (!mStatusMQ->read(&readStatus)) {
            ALOGE("status message read failed for \"%s\"", cmdName);
        }
         if (readStatus.retval == Result::OK) {
            ALOGW_IF(availToRead != readStatus.read,
                    "HAL read report inconsistent: mq = %d, status = %d",
                    (int32_t)availToRead, (int32_t)readStatus.read);
            *read = readStatus.read;
            ret = OK;
            callback(readStatus);
        } else {
            status = processReturn("read", readStatus.retval);
            ret = processReturn(cmdName, readStatus.retval);
        }
        return status;
        return ret;
    }
    if (ret == -EAGAIN) {
        // This normally retries no more than once.
@@ -561,16 +601,19 @@ retry:
}

status_t StreamInHalHidl::prepareForReading(size_t bufferSize) {
    std::unique_ptr<CommandMQ> tempCommandMQ;
    std::unique_ptr<DataMQ> tempDataMQ;
    std::unique_ptr<StatusMQ> tempStatusMQ;
    Result retval;
    Return<void> ret = mStream->prepareForReading(
            1, bufferSize, ThreadPriority(mHalThreadPriority),
            [&](Result r,
                    const CommandMQ::Descriptor& commandMQ,
                    const DataMQ::Descriptor& dataMQ,
                    const StatusMQ::Descriptor& statusMQ) {
                retval = r;
                if (retval == Result::OK) {
                    tempCommandMQ.reset(new CommandMQ(commandMQ));
                    tempDataMQ.reset(new DataMQ(dataMQ));
                    tempStatusMQ.reset(new StatusMQ(statusMQ));
                    if (tempDataMQ->isValid() && tempDataMQ->getEventFlagWord()) {
@@ -581,8 +624,13 @@ status_t StreamInHalHidl::prepareForReading(size_t bufferSize) {
    if (!ret.isOk() || retval != Result::OK) {
        return processReturn("prepareForReading", ret, retval);
    }
    if (!tempDataMQ || !tempDataMQ->isValid() || !tempStatusMQ || !tempStatusMQ->isValid()
        || !mEfGroup) {
    if (!tempCommandMQ || !tempCommandMQ->isValid() ||
            !tempDataMQ || !tempDataMQ->isValid() ||
            !tempStatusMQ || !tempStatusMQ->isValid() ||
            !mEfGroup) {
        ALOGE_IF(!tempCommandMQ, "Failed to obtain command message queue for writing");
        ALOGE_IF(tempCommandMQ && !tempCommandMQ->isValid(),
                "Command message queue for writing is invalid");
        ALOGE_IF(!tempDataMQ, "Failed to obtain data message queue for reading");
        ALOGE_IF(tempDataMQ && !tempDataMQ->isValid(), "Data message queue for reading is invalid");
        ALOGE_IF(!tempStatusMQ, "Failed to obtain status message queue for reading");
@@ -591,8 +639,10 @@ status_t StreamInHalHidl::prepareForReading(size_t bufferSize) {
        ALOGE_IF(!mEfGroup, "Event flag creation for reading failed");
        return NO_INIT;
    }
    mCommandMQ = std::move(tempCommandMQ);
    mDataMQ = std::move(tempDataMQ);
    mStatusMQ = std::move(tempStatusMQ);
    mReaderClient = gettid();
    return OK;
}

@@ -603,6 +653,15 @@ status_t StreamInHalHidl::getInputFramesLost(uint32_t *framesLost) {

status_t StreamInHalHidl::getCapturePosition(int64_t *frames, int64_t *time) {
    if (mStream == 0) return NO_INIT;
    if (mReaderClient == gettid() && mCommandMQ) {
        ReadParameters params;
        params.command = ReadCommand::GET_CAPTURE_POSITION;
        return callReaderThread(params, "getCapturePosition",
                [&](const ReadStatus& readStatus) {
                    *frames = readStatus.reply.capturePosition.frames;
                    *time = readStatus.reply.capturePosition.time;
                });
    } else {
        Result retval;
        Return<void> ret = mStream->getCapturePosition(
                [&](Result r, uint64_t hidlFrames, uint64_t hidlTime) {
@@ -614,5 +673,6 @@ status_t StreamInHalHidl::getCapturePosition(int64_t *frames, int64_t *time) {
                });
        return processReturn("getCapturePosition", ret, retval);
    }
}

} // namespace android
+17 −8
Original line number Diff line number Diff line
@@ -17,6 +17,8 @@
#ifndef ANDROID_HARDWARE_STREAM_HAL_HIDL_H
#define ANDROID_HARDWARE_STREAM_HAL_HIDL_H

#include <atomic>

#include <android/hardware/audio/2.0/IStream.h>
#include <android/hardware/audio/2.0/IStreamIn.h>
#include <android/hardware/audio/2.0/IStreamOut.h>
@@ -32,7 +34,9 @@ using ::android::hardware::audio::V2_0::IStreamOut;
using ::android::hardware::EventFlag;
using ::android::hardware::MessageQueue;
using ::android::hardware::Return;
using ReadParameters = ::android::hardware::audio::V2_0::IStreamIn::ReadParameters;
using ReadStatus = ::android::hardware::audio::V2_0::IStreamIn::ReadStatus;
using WriteCommand = ::android::hardware::audio::V2_0::IStreamOut::WriteCommand;
using WriteStatus = ::android::hardware::audio::V2_0::IStreamOut::WriteStatus;

namespace android {
@@ -155,28 +159,27 @@ class StreamOutHalHidl : public StreamOutHalInterface, public StreamHalHidl {

  private:
    friend class DeviceHalHidl;
    typedef MessageQueue<WriteCommand, hardware::kSynchronizedReadWrite> CommandMQ;
    typedef MessageQueue<uint8_t, hardware::kSynchronizedReadWrite> DataMQ;
    typedef MessageQueue<WriteStatus, hardware::kSynchronizedReadWrite> StatusMQ;

    wp<StreamOutHalInterfaceCallback> mCallback;
    sp<IStreamOut> mStream;
    std::unique_ptr<CommandMQ> mCommandMQ;
    std::unique_ptr<DataMQ> mDataMQ;
    std::unique_ptr<StatusMQ> mStatusMQ;
    std::atomic<pid_t> mWriterClient;
    EventFlag* mEfGroup;
    bool mGetPresentationPositionNotSupported;
    struct {
        uint64_t obtained;
        status_t status;
        uint64_t frames;
        struct timespec ts;
    } mPPosFromWrite;

    // Can not be constructed directly by clients.
    StreamOutHalHidl(const sp<IStreamOut>& stream);

    virtual ~StreamOutHalHidl();

    uint64_t getCurrentTimeMs();
    using WriterCallback = std::function<void(const WriteStatus& writeStatus)>;
    status_t callWriterThread(
            WriteCommand cmd, const char* cmdName,
            const uint8_t* data, size_t dataSize, WriterCallback callback);
    status_t prepareForWriting(size_t bufferSize);
};

@@ -200,12 +203,15 @@ class StreamInHalHidl : public StreamInHalInterface, public StreamHalHidl {

  private:
    friend class DeviceHalHidl;
    typedef MessageQueue<ReadParameters, hardware::kSynchronizedReadWrite> CommandMQ;
    typedef MessageQueue<uint8_t, hardware::kSynchronizedReadWrite> DataMQ;
    typedef MessageQueue<ReadStatus, hardware::kSynchronizedReadWrite> StatusMQ;

    sp<IStreamIn> mStream;
    std::unique_ptr<CommandMQ> mCommandMQ;
    std::unique_ptr<DataMQ> mDataMQ;
    std::unique_ptr<StatusMQ> mStatusMQ;
    std::atomic<pid_t> mReaderClient;
    EventFlag* mEfGroup;

    // Can not be constructed directly by clients.
@@ -213,6 +219,9 @@ class StreamInHalHidl : public StreamInHalInterface, public StreamHalHidl {

    virtual ~StreamInHalHidl();

    using ReaderCallback = std::function<void(const ReadStatus& readStatus)>;
    status_t callReaderThread(
            const ReadParameters& params, const char* cmdName, ReaderCallback callback);
    status_t prepareForReading(size_t bufferSize);
};