Loading audio/2.0/IStreamIn.hal +41 −7 Original line number Diff line number Diff line Loading @@ -40,6 +40,26 @@ interface IStreamIn extends IStream { */ setGain(float gain) generates (Result retval); /* * Commands that can be executed on the driver reader thread. */ enum ReadCommand : int32_t { READ, GET_CAPTURE_POSITION }; /* * Data structure passed to the driver for executing commands * on the driver reader thread. */ struct ReadParameters { ReadCommand command; // discriminator union Params { uint64_t read; // READ command, amount of bytes to read, >= 0. // No parameters for GET_CAPTURE_POSITION. } params; }; /* * Data structure passed back to the client via status message queue * of 'read' operation. Loading @@ -51,24 +71,36 @@ interface IStreamIn extends IStream { */ struct ReadStatus { Result retval; uint64_t read; ReadCommand replyTo; // discriminator union Reply { uint64_t read; // READ command, amount of bytes read, >= 0. struct CapturePosition { // same as generated by getCapturePosition. uint64_t frames; uint64_t time; } capturePosition; } reply; }; /* * Set up required transports for receiving audio buffers from the driver. * * The transport consists of two message queues: one is used for passing * audio data from the driver to the client, another is used for reporting * read operation status (amount of bytes actually read or error code), * see ReadStatus structure definition. * The transport consists of three message queues: * -- command queue is used to instruct the reader thread what operation * to perform; * -- data queue is used for passing audio data from the driver * to the client; * -- status queue is used for reporting operation status * (e.g. amount of bytes actually read or error code). * The driver operates on a dedicated thread. * * @param frameSize the size of a single frame, in bytes. * @param framesCount the number of frames in a buffer. * @param threadPriority priority of the thread that performs reads. * @param threadPriority priority of the driver thread. * @return retval OK if both message queues were created successfully. * INVALID_STATE if the method was already called. * INVALID_ARGUMENTS if there was a problem setting up * the queues. * @return commandMQ a message queue used for passing commands. * @return dataMQ a message queue used for passing audio data in the format * specified at the stream opening. * @return statusMQ a message queue used for passing status from the driver Loading @@ -79,7 +111,9 @@ interface IStreamIn extends IStream { ThreadPriority threadPriority) generates ( Result retval, fmq_sync<uint8_t> dataMQ, fmq_sync<ReadStatus> statusMQ); fmq_sync<ReadParameters> commandMQ, fmq_sync<uint8_t> dataMQ, fmq_sync<ReadStatus> statusMQ); /* * Return the amount of input frames lost in the audio driver since the last Loading audio/2.0/IStreamOut.hal +35 −20 Original line number Diff line number Diff line Loading @@ -43,45 +43,58 @@ interface IStreamOut extends IStream { */ setVolume(float left, float right) generates (Result retval); /* * Commands that can be executed on the driver writer thread. */ enum WriteCommand : int32_t { WRITE, GET_PRESENTATION_POSITION, GET_LATENCY }; /* * Data structure passed back to the client via status message queue * of 'write' operation. * * Possible values of 'writeRetval' field: * Possible values of 'retval' field: * - OK, write operation was successful; * - INVALID_ARGUMENTS, stream was not configured properly; * - INVALID_STATE, stream is in a state that doesn't allow writes. * * Possible values of 'presentationPositionRetval' field (must only * be considered if 'writeRetval' field is set to 'OK'): * - OK, presentation position retrieved successfully; * - INVALID_ARGUMENTS, indicates that the position can't be retrieved; * - INVALID_OPERATION, retrieving presentation position isn't supported; * - INVALID_STATE, stream is in a state that doesn't allow writes; * - INVALID_OPERATION, retrieving presentation position isn't supported. */ struct WriteStatus { Result writeRetval; uint64_t written; Result presentationPositionRetval; uint64_t frames; // presentation position TimeSpec timeStamp; // presentation position Result retval; WriteCommand replyTo; // discriminator union Reply { uint64_t written; // WRITE command, amount of bytes written, >= 0. struct PresentationPosition { // same as generated by uint64_t frames; // getPresentationPosition. TimeSpec timeStamp; } presentationPosition; uint32_t latencyMs; // Same as generated by getLatency. } reply; }; /* * Set up required transports for passing audio buffers to the driver. * * The transport consists of two message queues: one is used for passing * audio data from the client to the driver, another is used for reporting * write operation status (amount of bytes actually written or error code), * and the presentation position immediately after the write, see * WriteStatus structure definition. * The transport consists of three message queues: * -- command queue is used to instruct the writer thread what operation * to perform; * -- data queue is used for passing audio data from the client * to the driver; * -- status queue is used for reporting operation status * (e.g. amount of bytes actually written or error code). * The driver operates on a dedicated thread. * * @param frameSize the size of a single frame, in bytes. * @param framesCount the number of frames in a buffer. * @param threadPriority priority of the thread that performs writes. * @param threadPriority priority of the driver thread. * @return retval OK if both message queues were created successfully. * INVALID_STATE if the method was already called. * INVALID_ARGUMENTS if there was a problem setting up * the queues. * @return commandMQ a message queue used for passing commands. * @return dataMQ a message queue used for passing audio data in the format * specified at the stream opening. * @return statusMQ a message queue used for passing status from the driver Loading @@ -92,7 +105,9 @@ interface IStreamOut extends IStream { ThreadPriority threadPriority) generates ( Result retval, fmq_sync<uint8_t> dataMQ, fmq_sync<WriteStatus> statusMQ); fmq_sync<WriteCommand> commandMQ, fmq_sync<uint8_t> dataMQ, fmq_sync<WriteStatus> statusMQ); /* * Return the number of audio frames written by the audio DSP to DAC since Loading audio/2.0/default/StreamIn.cpp +80 −34 Original line number Diff line number Diff line Loading @@ -38,6 +38,7 @@ class ReadThread : public Thread { // ReadThread's lifespan never exceeds StreamIn's lifespan. ReadThread(std::atomic<bool>* stop, audio_stream_in_t* stream, StreamIn::CommandMQ* commandMQ, StreamIn::DataMQ* dataMQ, StreamIn::StatusMQ* statusMQ, EventFlag* efGroup, Loading @@ -45,6 +46,7 @@ class ReadThread : public Thread { : Thread(false /*canCallJava*/), mStop(stop), mStream(stream), mCommandMQ(commandMQ), mDataMQ(dataMQ), mStatusMQ(statusMQ), mEfGroup(efGroup), Loading @@ -58,13 +60,19 @@ class ReadThread : public Thread { private: std::atomic<bool>* mStop; audio_stream_in_t* mStream; StreamIn::CommandMQ* mCommandMQ; StreamIn::DataMQ* mDataMQ; StreamIn::StatusMQ* mStatusMQ; EventFlag* mEfGroup; ThreadPriority mThreadPriority; std::unique_ptr<uint8_t[]> mBuffer; IStreamIn::ReadParameters mParameters; IStreamIn::ReadStatus mStatus; bool threadLoop() override; void doGetCapturePosition(); void doRead(); }; status_t ReadThread::readyToRun() { Loading @@ -77,6 +85,32 @@ status_t ReadThread::readyToRun() { return OK; } void ReadThread::doRead() { size_t availableToWrite = mDataMQ->availableToWrite(); size_t requestedToRead = mParameters.params.read; if (requestedToRead > availableToWrite) { ALOGW("truncating read data from %d to %d due to insufficient data queue space", (int32_t)requestedToRead, (int32_t)availableToWrite); requestedToRead = availableToWrite; } ssize_t readResult = mStream->read(mStream, &mBuffer[0], requestedToRead); mStatus.retval = Result::OK; uint64_t read = 0; if (readResult >= 0) { mStatus.reply.read = readResult; if (!mDataMQ->write(&mBuffer[0], readResult)) { ALOGW("data message queue write failed"); } } else { mStatus.retval = Stream::analyzeStatus("read", readResult); } } void ReadThread::doGetCapturePosition() { mStatus.retval = StreamIn::getCapturePositionImpl( mStream, &mStatus.reply.capturePosition.frames, &mStatus.reply.capturePosition.time); } bool ReadThread::threadLoop() { // This implementation doesn't return control back to the Thread until it decides to stop, // as the Thread uses mutexes, and this can lead to priority inversion. Loading @@ -87,21 +121,23 @@ bool ReadThread::threadLoop() { if (!(efState & static_cast<uint32_t>(MessageQueueFlagBits::NOT_FULL))) { continue; // Nothing to do. } const size_t availToWrite = mDataMQ->availableToWrite(); ssize_t readResult = mStream->read(mStream, &mBuffer[0], availToWrite); Result retval = Result::OK; uint64_t read = 0; if (readResult >= 0) { read = readResult; if (!mDataMQ->write(&mBuffer[0], readResult)) { ALOGW("data message queue write failed"); } } else { retval = Stream::analyzeStatus("read", readResult); if (!mCommandMQ->read(&mParameters)) { continue; // Nothing to do. } IStreamIn::ReadStatus status = { retval, read }; if (!mStatusMQ->write(&status)) { mStatus.replyTo = mParameters.command; switch (mParameters.command) { case IStreamIn::ReadCommand::READ: doRead(); break; case IStreamIn::ReadCommand::GET_CAPTURE_POSITION: doGetCapturePosition(); break; default: ALOGE("Unknown read thread command code %d", mParameters.command); mStatus.retval = Result::NOT_SUPPORTED; break; } if (!mStatusMQ->write(&mStatus)) { ALOGW("status message queue write failed"); } mEfGroup->wake(static_cast<uint32_t>(MessageQueueFlagBits::NOT_EMPTY)); Loading Loading @@ -275,17 +311,19 @@ Return<void> StreamIn::prepareForReading( if (mDataMQ) { ALOGE("the client attempts to call prepareForReading twice"); _hidl_cb(Result::INVALID_STATE, DataMQ::Descriptor(), StatusMQ::Descriptor()); CommandMQ::Descriptor(), DataMQ::Descriptor(), StatusMQ::Descriptor()); return Void(); } std::unique_ptr<CommandMQ> tempCommandMQ(new CommandMQ(1)); std::unique_ptr<DataMQ> tempDataMQ( new DataMQ(frameSize * framesCount, true /* EventFlag */)); std::unique_ptr<StatusMQ> tempStatusMQ(new StatusMQ(1)); if (!tempDataMQ->isValid() || !tempStatusMQ->isValid()) { if (!tempCommandMQ->isValid() || !tempDataMQ->isValid() || !tempStatusMQ->isValid()) { ALOGE_IF(!tempCommandMQ->isValid(), "command MQ is invalid"); ALOGE_IF(!tempDataMQ->isValid(), "data MQ is invalid"); ALOGE_IF(!tempStatusMQ->isValid(), "status MQ is invalid"); _hidl_cb(Result::INVALID_ARGUMENTS, DataMQ::Descriptor(), StatusMQ::Descriptor()); CommandMQ::Descriptor(), DataMQ::Descriptor(), StatusMQ::Descriptor()); return Void(); } // TODO: Remove event flag management once blocking MQ is implemented. b/33815422 Loading @@ -293,7 +331,7 @@ Return<void> StreamIn::prepareForReading( if (status != OK || !mEfGroup) { ALOGE("failed creating event flag for data MQ: %s", strerror(-status)); _hidl_cb(Result::INVALID_ARGUMENTS, DataMQ::Descriptor(), StatusMQ::Descriptor()); CommandMQ::Descriptor(), DataMQ::Descriptor(), StatusMQ::Descriptor()); return Void(); } Loading @@ -301,6 +339,7 @@ Return<void> StreamIn::prepareForReading( mReadThread = new ReadThread( &mStopReadThread, mStream, tempCommandMQ.get(), tempDataMQ.get(), tempStatusMQ.get(), mEfGroup, Loading @@ -309,13 +348,14 @@ Return<void> StreamIn::prepareForReading( if (status != OK) { ALOGW("failed to start reader thread: %s", strerror(-status)); _hidl_cb(Result::INVALID_ARGUMENTS, DataMQ::Descriptor(), StatusMQ::Descriptor()); CommandMQ::Descriptor(), DataMQ::Descriptor(), StatusMQ::Descriptor()); return Void(); } mCommandMQ = std::move(tempCommandMQ); mDataMQ = std::move(tempDataMQ); mStatusMQ = std::move(tempStatusMQ); _hidl_cb(Result::OK, *mDataMQ->getDesc(), *mStatusMQ->getDesc()); _hidl_cb(Result::OK, *mCommandMQ->getDesc(), *mDataMQ->getDesc(), *mStatusMQ->getDesc()); return Void(); } Loading @@ -323,22 +363,28 @@ Return<uint32_t> StreamIn::getInputFramesLost() { return mStream->get_input_frames_lost(mStream); } Return<void> StreamIn::getCapturePosition(getCapturePosition_cb _hidl_cb) { // static Result StreamIn::getCapturePositionImpl( audio_stream_in_t *stream, uint64_t *frames, uint64_t *time) { Result retval(Result::NOT_SUPPORTED); uint64_t frames = 0, time = 0; if (mStream->get_capture_position != NULL) { if (stream->get_capture_position != NULL) return retval; int64_t halFrames, halTime; retval = Stream::analyzeStatus( "get_capture_position", mStream->get_capture_position(mStream, &halFrames, &halTime), stream->get_capture_position(stream, &halFrames, &halTime), // HAL may have a stub function, always returning ENOSYS, don't // spam the log in this case. ENOSYS); if (retval == Result::OK) { frames = halFrames; time = halTime; } *frames = halFrames; *time = halTime; } return retval; }; Return<void> StreamIn::getCapturePosition(getCapturePosition_cb _hidl_cb) { uint64_t frames = 0, time = 0; Result retval = getCapturePositionImpl(mStream, &frames, &time); _hidl_cb(retval, frames, time); return Void(); } Loading audio/2.0/default/StreamIn.h +5 −0 Original line number Diff line number Diff line Loading @@ -52,6 +52,7 @@ using ::android::hardware::hidl_string; using ::android::sp; struct StreamIn : public IStreamIn { typedef MessageQueue<ReadParameters, kSynchronizedReadWrite> CommandMQ; typedef MessageQueue<uint8_t, kSynchronizedReadWrite> DataMQ; typedef MessageQueue<ReadStatus, kSynchronizedReadWrite> StatusMQ; Loading Loading @@ -97,12 +98,16 @@ struct StreamIn : public IStreamIn { Return<void> createMmapBuffer(int32_t minSizeFrames, createMmapBuffer_cb _hidl_cb) override; Return<void> getMmapPosition(getMmapPosition_cb _hidl_cb) override; static Result getCapturePositionImpl( audio_stream_in_t *stream, uint64_t *frames, uint64_t *time); private: bool mIsClosed; audio_hw_device_t *mDevice; audio_stream_in_t *mStream; sp<Stream> mStreamCommon; sp<StreamMmap<audio_stream_in_t>> mStreamMmap; std::unique_ptr<CommandMQ> mCommandMQ; std::unique_ptr<DataMQ> mDataMQ; std::unique_ptr<StatusMQ> mStatusMQ; EventFlag* mEfGroup; Loading audio/2.0/default/StreamOut.cpp +63 −23 Original line number Diff line number Diff line Loading @@ -36,6 +36,7 @@ class WriteThread : public Thread { // WriteThread's lifespan never exceeds StreamOut's lifespan. WriteThread(std::atomic<bool>* stop, audio_stream_out_t* stream, StreamOut::CommandMQ* commandMQ, StreamOut::DataMQ* dataMQ, StreamOut::StatusMQ* statusMQ, EventFlag* efGroup, Loading @@ -43,6 +44,7 @@ class WriteThread : public Thread { : Thread(false /*canCallJava*/), mStop(stop), mStream(stream), mCommandMQ(commandMQ), mDataMQ(dataMQ), mStatusMQ(statusMQ), mEfGroup(efGroup), Loading @@ -56,13 +58,19 @@ class WriteThread : public Thread { private: std::atomic<bool>* mStop; audio_stream_out_t* mStream; StreamOut::CommandMQ* mCommandMQ; StreamOut::DataMQ* mDataMQ; StreamOut::StatusMQ* mStatusMQ; EventFlag* mEfGroup; ThreadPriority mThreadPriority; std::unique_ptr<uint8_t[]> mBuffer; IStreamOut::WriteStatus mStatus; bool threadLoop() override; void doGetLatency(); void doGetPresentationPosition(); void doWrite(); }; status_t WriteThread::readyToRun() { Loading @@ -75,6 +83,32 @@ status_t WriteThread::readyToRun() { return OK; } void WriteThread::doWrite() { const size_t availToRead = mDataMQ->availableToRead(); mStatus.retval = Result::OK; mStatus.reply.written = 0; if (mDataMQ->read(&mBuffer[0], availToRead)) { ssize_t writeResult = mStream->write(mStream, &mBuffer[0], availToRead); if (writeResult >= 0) { mStatus.reply.written = writeResult; } else { mStatus.retval = Stream::analyzeStatus("write", writeResult); } } } void WriteThread::doGetPresentationPosition() { mStatus.retval = StreamOut::getPresentationPositionImpl( mStream, &mStatus.reply.presentationPosition.frames, &mStatus.reply.presentationPosition.timeStamp); } void WriteThread::doGetLatency() { mStatus.retval = Result::OK; mStatus.reply.latencyMs = mStream->get_latency(mStream); } bool WriteThread::threadLoop() { // This implementation doesn't return control back to the Thread until it decides to stop, // as the Thread uses mutexes, and this can lead to priority inversion. Loading @@ -86,24 +120,26 @@ bool WriteThread::threadLoop() { if (!(efState & static_cast<uint32_t>(MessageQueueFlagBits::NOT_EMPTY))) { continue; // Nothing to do. } const size_t availToRead = mDataMQ->availableToRead(); IStreamOut::WriteStatus status; status.writeRetval = Result::OK; status.written = 0; if (mDataMQ->read(&mBuffer[0], availToRead)) { ssize_t writeResult = mStream->write(mStream, &mBuffer[0], availToRead); if (writeResult >= 0) { status.written = writeResult; } else { status.writeRetval = Stream::analyzeStatus("write", writeResult); if (!mCommandMQ->read(&mStatus.replyTo)) { continue; // Nothing to do. } switch (mStatus.replyTo) { case IStreamOut::WriteCommand::WRITE: doWrite(); break; case IStreamOut::WriteCommand::GET_PRESENTATION_POSITION: doGetPresentationPosition(); break; case IStreamOut::WriteCommand::GET_LATENCY: doGetLatency(); break; default: ALOGE("Unknown write thread command code %d", mStatus.replyTo); mStatus.retval = Result::NOT_SUPPORTED; break; } status.presentationPositionRetval = status.writeRetval == Result::OK ? StreamOut::getPresentationPositionImpl(mStream, &status.frames, &status.timeStamp) : Result::OK; if (!mStatusMQ->write(&status)) { ALOGW("status message queue write failed"); if (!mStatusMQ->write(&mStatus)) { ALOGE("status message queue write failed"); } mEfGroup->wake(static_cast<uint32_t>(MessageQueueFlagBits::NOT_FULL)); } Loading Loading @@ -259,17 +295,19 @@ Return<void> StreamOut::prepareForWriting( if (mDataMQ) { ALOGE("the client attempts to call prepareForWriting twice"); _hidl_cb(Result::INVALID_STATE, DataMQ::Descriptor(), StatusMQ::Descriptor()); CommandMQ::Descriptor(), DataMQ::Descriptor(), StatusMQ::Descriptor()); return Void(); } std::unique_ptr<CommandMQ> tempCommandMQ(new CommandMQ(1)); std::unique_ptr<DataMQ> tempDataMQ( new DataMQ(frameSize * framesCount, true /* EventFlag */)); std::unique_ptr<StatusMQ> tempStatusMQ(new StatusMQ(1)); if (!tempDataMQ->isValid() || !tempStatusMQ->isValid()) { if (!tempCommandMQ->isValid() || !tempDataMQ->isValid() || !tempStatusMQ->isValid()) { ALOGE_IF(!tempCommandMQ->isValid(), "command MQ is invalid"); ALOGE_IF(!tempDataMQ->isValid(), "data MQ is invalid"); ALOGE_IF(!tempStatusMQ->isValid(), "status MQ is invalid"); _hidl_cb(Result::INVALID_ARGUMENTS, DataMQ::Descriptor(), StatusMQ::Descriptor()); CommandMQ::Descriptor(), DataMQ::Descriptor(), StatusMQ::Descriptor()); return Void(); } // TODO: Remove event flag management once blocking MQ is implemented. b/33815422 Loading @@ -277,7 +315,7 @@ Return<void> StreamOut::prepareForWriting( if (status != OK || !mEfGroup) { ALOGE("failed creating event flag for data MQ: %s", strerror(-status)); _hidl_cb(Result::INVALID_ARGUMENTS, DataMQ::Descriptor(), StatusMQ::Descriptor()); CommandMQ::Descriptor(), DataMQ::Descriptor(), StatusMQ::Descriptor()); return Void(); } Loading @@ -285,6 +323,7 @@ Return<void> StreamOut::prepareForWriting( mWriteThread = new WriteThread( &mStopWriteThread, mStream, tempCommandMQ.get(), tempDataMQ.get(), tempStatusMQ.get(), mEfGroup, Loading @@ -293,13 +332,14 @@ Return<void> StreamOut::prepareForWriting( if (status != OK) { ALOGW("failed to start writer thread: %s", strerror(-status)); _hidl_cb(Result::INVALID_ARGUMENTS, DataMQ::Descriptor(), StatusMQ::Descriptor()); CommandMQ::Descriptor(), DataMQ::Descriptor(), StatusMQ::Descriptor()); return Void(); } mCommandMQ = std::move(tempCommandMQ); mDataMQ = std::move(tempDataMQ); mStatusMQ = std::move(tempStatusMQ); _hidl_cb(Result::OK, *mDataMQ->getDesc(), *mStatusMQ->getDesc()); _hidl_cb(Result::OK, *mCommandMQ->getDesc(), *mDataMQ->getDesc(), *mStatusMQ->getDesc()); return Void(); } Loading Loading
audio/2.0/IStreamIn.hal +41 −7 Original line number Diff line number Diff line Loading @@ -40,6 +40,26 @@ interface IStreamIn extends IStream { */ setGain(float gain) generates (Result retval); /* * Commands that can be executed on the driver reader thread. */ enum ReadCommand : int32_t { READ, GET_CAPTURE_POSITION }; /* * Data structure passed to the driver for executing commands * on the driver reader thread. */ struct ReadParameters { ReadCommand command; // discriminator union Params { uint64_t read; // READ command, amount of bytes to read, >= 0. // No parameters for GET_CAPTURE_POSITION. } params; }; /* * Data structure passed back to the client via status message queue * of 'read' operation. Loading @@ -51,24 +71,36 @@ interface IStreamIn extends IStream { */ struct ReadStatus { Result retval; uint64_t read; ReadCommand replyTo; // discriminator union Reply { uint64_t read; // READ command, amount of bytes read, >= 0. struct CapturePosition { // same as generated by getCapturePosition. uint64_t frames; uint64_t time; } capturePosition; } reply; }; /* * Set up required transports for receiving audio buffers from the driver. * * The transport consists of two message queues: one is used for passing * audio data from the driver to the client, another is used for reporting * read operation status (amount of bytes actually read or error code), * see ReadStatus structure definition. * The transport consists of three message queues: * -- command queue is used to instruct the reader thread what operation * to perform; * -- data queue is used for passing audio data from the driver * to the client; * -- status queue is used for reporting operation status * (e.g. amount of bytes actually read or error code). * The driver operates on a dedicated thread. * * @param frameSize the size of a single frame, in bytes. * @param framesCount the number of frames in a buffer. * @param threadPriority priority of the thread that performs reads. * @param threadPriority priority of the driver thread. * @return retval OK if both message queues were created successfully. * INVALID_STATE if the method was already called. * INVALID_ARGUMENTS if there was a problem setting up * the queues. * @return commandMQ a message queue used for passing commands. * @return dataMQ a message queue used for passing audio data in the format * specified at the stream opening. * @return statusMQ a message queue used for passing status from the driver Loading @@ -79,7 +111,9 @@ interface IStreamIn extends IStream { ThreadPriority threadPriority) generates ( Result retval, fmq_sync<uint8_t> dataMQ, fmq_sync<ReadStatus> statusMQ); fmq_sync<ReadParameters> commandMQ, fmq_sync<uint8_t> dataMQ, fmq_sync<ReadStatus> statusMQ); /* * Return the amount of input frames lost in the audio driver since the last Loading
audio/2.0/IStreamOut.hal +35 −20 Original line number Diff line number Diff line Loading @@ -43,45 +43,58 @@ interface IStreamOut extends IStream { */ setVolume(float left, float right) generates (Result retval); /* * Commands that can be executed on the driver writer thread. */ enum WriteCommand : int32_t { WRITE, GET_PRESENTATION_POSITION, GET_LATENCY }; /* * Data structure passed back to the client via status message queue * of 'write' operation. * * Possible values of 'writeRetval' field: * Possible values of 'retval' field: * - OK, write operation was successful; * - INVALID_ARGUMENTS, stream was not configured properly; * - INVALID_STATE, stream is in a state that doesn't allow writes. * * Possible values of 'presentationPositionRetval' field (must only * be considered if 'writeRetval' field is set to 'OK'): * - OK, presentation position retrieved successfully; * - INVALID_ARGUMENTS, indicates that the position can't be retrieved; * - INVALID_OPERATION, retrieving presentation position isn't supported; * - INVALID_STATE, stream is in a state that doesn't allow writes; * - INVALID_OPERATION, retrieving presentation position isn't supported. */ struct WriteStatus { Result writeRetval; uint64_t written; Result presentationPositionRetval; uint64_t frames; // presentation position TimeSpec timeStamp; // presentation position Result retval; WriteCommand replyTo; // discriminator union Reply { uint64_t written; // WRITE command, amount of bytes written, >= 0. struct PresentationPosition { // same as generated by uint64_t frames; // getPresentationPosition. TimeSpec timeStamp; } presentationPosition; uint32_t latencyMs; // Same as generated by getLatency. } reply; }; /* * Set up required transports for passing audio buffers to the driver. * * The transport consists of two message queues: one is used for passing * audio data from the client to the driver, another is used for reporting * write operation status (amount of bytes actually written or error code), * and the presentation position immediately after the write, see * WriteStatus structure definition. * The transport consists of three message queues: * -- command queue is used to instruct the writer thread what operation * to perform; * -- data queue is used for passing audio data from the client * to the driver; * -- status queue is used for reporting operation status * (e.g. amount of bytes actually written or error code). * The driver operates on a dedicated thread. * * @param frameSize the size of a single frame, in bytes. * @param framesCount the number of frames in a buffer. * @param threadPriority priority of the thread that performs writes. * @param threadPriority priority of the driver thread. * @return retval OK if both message queues were created successfully. * INVALID_STATE if the method was already called. * INVALID_ARGUMENTS if there was a problem setting up * the queues. * @return commandMQ a message queue used for passing commands. * @return dataMQ a message queue used for passing audio data in the format * specified at the stream opening. * @return statusMQ a message queue used for passing status from the driver Loading @@ -92,7 +105,9 @@ interface IStreamOut extends IStream { ThreadPriority threadPriority) generates ( Result retval, fmq_sync<uint8_t> dataMQ, fmq_sync<WriteStatus> statusMQ); fmq_sync<WriteCommand> commandMQ, fmq_sync<uint8_t> dataMQ, fmq_sync<WriteStatus> statusMQ); /* * Return the number of audio frames written by the audio DSP to DAC since Loading
audio/2.0/default/StreamIn.cpp +80 −34 Original line number Diff line number Diff line Loading @@ -38,6 +38,7 @@ class ReadThread : public Thread { // ReadThread's lifespan never exceeds StreamIn's lifespan. ReadThread(std::atomic<bool>* stop, audio_stream_in_t* stream, StreamIn::CommandMQ* commandMQ, StreamIn::DataMQ* dataMQ, StreamIn::StatusMQ* statusMQ, EventFlag* efGroup, Loading @@ -45,6 +46,7 @@ class ReadThread : public Thread { : Thread(false /*canCallJava*/), mStop(stop), mStream(stream), mCommandMQ(commandMQ), mDataMQ(dataMQ), mStatusMQ(statusMQ), mEfGroup(efGroup), Loading @@ -58,13 +60,19 @@ class ReadThread : public Thread { private: std::atomic<bool>* mStop; audio_stream_in_t* mStream; StreamIn::CommandMQ* mCommandMQ; StreamIn::DataMQ* mDataMQ; StreamIn::StatusMQ* mStatusMQ; EventFlag* mEfGroup; ThreadPriority mThreadPriority; std::unique_ptr<uint8_t[]> mBuffer; IStreamIn::ReadParameters mParameters; IStreamIn::ReadStatus mStatus; bool threadLoop() override; void doGetCapturePosition(); void doRead(); }; status_t ReadThread::readyToRun() { Loading @@ -77,6 +85,32 @@ status_t ReadThread::readyToRun() { return OK; } void ReadThread::doRead() { size_t availableToWrite = mDataMQ->availableToWrite(); size_t requestedToRead = mParameters.params.read; if (requestedToRead > availableToWrite) { ALOGW("truncating read data from %d to %d due to insufficient data queue space", (int32_t)requestedToRead, (int32_t)availableToWrite); requestedToRead = availableToWrite; } ssize_t readResult = mStream->read(mStream, &mBuffer[0], requestedToRead); mStatus.retval = Result::OK; uint64_t read = 0; if (readResult >= 0) { mStatus.reply.read = readResult; if (!mDataMQ->write(&mBuffer[0], readResult)) { ALOGW("data message queue write failed"); } } else { mStatus.retval = Stream::analyzeStatus("read", readResult); } } void ReadThread::doGetCapturePosition() { mStatus.retval = StreamIn::getCapturePositionImpl( mStream, &mStatus.reply.capturePosition.frames, &mStatus.reply.capturePosition.time); } bool ReadThread::threadLoop() { // This implementation doesn't return control back to the Thread until it decides to stop, // as the Thread uses mutexes, and this can lead to priority inversion. Loading @@ -87,21 +121,23 @@ bool ReadThread::threadLoop() { if (!(efState & static_cast<uint32_t>(MessageQueueFlagBits::NOT_FULL))) { continue; // Nothing to do. } const size_t availToWrite = mDataMQ->availableToWrite(); ssize_t readResult = mStream->read(mStream, &mBuffer[0], availToWrite); Result retval = Result::OK; uint64_t read = 0; if (readResult >= 0) { read = readResult; if (!mDataMQ->write(&mBuffer[0], readResult)) { ALOGW("data message queue write failed"); } } else { retval = Stream::analyzeStatus("read", readResult); if (!mCommandMQ->read(&mParameters)) { continue; // Nothing to do. } IStreamIn::ReadStatus status = { retval, read }; if (!mStatusMQ->write(&status)) { mStatus.replyTo = mParameters.command; switch (mParameters.command) { case IStreamIn::ReadCommand::READ: doRead(); break; case IStreamIn::ReadCommand::GET_CAPTURE_POSITION: doGetCapturePosition(); break; default: ALOGE("Unknown read thread command code %d", mParameters.command); mStatus.retval = Result::NOT_SUPPORTED; break; } if (!mStatusMQ->write(&mStatus)) { ALOGW("status message queue write failed"); } mEfGroup->wake(static_cast<uint32_t>(MessageQueueFlagBits::NOT_EMPTY)); Loading Loading @@ -275,17 +311,19 @@ Return<void> StreamIn::prepareForReading( if (mDataMQ) { ALOGE("the client attempts to call prepareForReading twice"); _hidl_cb(Result::INVALID_STATE, DataMQ::Descriptor(), StatusMQ::Descriptor()); CommandMQ::Descriptor(), DataMQ::Descriptor(), StatusMQ::Descriptor()); return Void(); } std::unique_ptr<CommandMQ> tempCommandMQ(new CommandMQ(1)); std::unique_ptr<DataMQ> tempDataMQ( new DataMQ(frameSize * framesCount, true /* EventFlag */)); std::unique_ptr<StatusMQ> tempStatusMQ(new StatusMQ(1)); if (!tempDataMQ->isValid() || !tempStatusMQ->isValid()) { if (!tempCommandMQ->isValid() || !tempDataMQ->isValid() || !tempStatusMQ->isValid()) { ALOGE_IF(!tempCommandMQ->isValid(), "command MQ is invalid"); ALOGE_IF(!tempDataMQ->isValid(), "data MQ is invalid"); ALOGE_IF(!tempStatusMQ->isValid(), "status MQ is invalid"); _hidl_cb(Result::INVALID_ARGUMENTS, DataMQ::Descriptor(), StatusMQ::Descriptor()); CommandMQ::Descriptor(), DataMQ::Descriptor(), StatusMQ::Descriptor()); return Void(); } // TODO: Remove event flag management once blocking MQ is implemented. b/33815422 Loading @@ -293,7 +331,7 @@ Return<void> StreamIn::prepareForReading( if (status != OK || !mEfGroup) { ALOGE("failed creating event flag for data MQ: %s", strerror(-status)); _hidl_cb(Result::INVALID_ARGUMENTS, DataMQ::Descriptor(), StatusMQ::Descriptor()); CommandMQ::Descriptor(), DataMQ::Descriptor(), StatusMQ::Descriptor()); return Void(); } Loading @@ -301,6 +339,7 @@ Return<void> StreamIn::prepareForReading( mReadThread = new ReadThread( &mStopReadThread, mStream, tempCommandMQ.get(), tempDataMQ.get(), tempStatusMQ.get(), mEfGroup, Loading @@ -309,13 +348,14 @@ Return<void> StreamIn::prepareForReading( if (status != OK) { ALOGW("failed to start reader thread: %s", strerror(-status)); _hidl_cb(Result::INVALID_ARGUMENTS, DataMQ::Descriptor(), StatusMQ::Descriptor()); CommandMQ::Descriptor(), DataMQ::Descriptor(), StatusMQ::Descriptor()); return Void(); } mCommandMQ = std::move(tempCommandMQ); mDataMQ = std::move(tempDataMQ); mStatusMQ = std::move(tempStatusMQ); _hidl_cb(Result::OK, *mDataMQ->getDesc(), *mStatusMQ->getDesc()); _hidl_cb(Result::OK, *mCommandMQ->getDesc(), *mDataMQ->getDesc(), *mStatusMQ->getDesc()); return Void(); } Loading @@ -323,22 +363,28 @@ Return<uint32_t> StreamIn::getInputFramesLost() { return mStream->get_input_frames_lost(mStream); } Return<void> StreamIn::getCapturePosition(getCapturePosition_cb _hidl_cb) { // static Result StreamIn::getCapturePositionImpl( audio_stream_in_t *stream, uint64_t *frames, uint64_t *time) { Result retval(Result::NOT_SUPPORTED); uint64_t frames = 0, time = 0; if (mStream->get_capture_position != NULL) { if (stream->get_capture_position != NULL) return retval; int64_t halFrames, halTime; retval = Stream::analyzeStatus( "get_capture_position", mStream->get_capture_position(mStream, &halFrames, &halTime), stream->get_capture_position(stream, &halFrames, &halTime), // HAL may have a stub function, always returning ENOSYS, don't // spam the log in this case. ENOSYS); if (retval == Result::OK) { frames = halFrames; time = halTime; } *frames = halFrames; *time = halTime; } return retval; }; Return<void> StreamIn::getCapturePosition(getCapturePosition_cb _hidl_cb) { uint64_t frames = 0, time = 0; Result retval = getCapturePositionImpl(mStream, &frames, &time); _hidl_cb(retval, frames, time); return Void(); } Loading
audio/2.0/default/StreamIn.h +5 −0 Original line number Diff line number Diff line Loading @@ -52,6 +52,7 @@ using ::android::hardware::hidl_string; using ::android::sp; struct StreamIn : public IStreamIn { typedef MessageQueue<ReadParameters, kSynchronizedReadWrite> CommandMQ; typedef MessageQueue<uint8_t, kSynchronizedReadWrite> DataMQ; typedef MessageQueue<ReadStatus, kSynchronizedReadWrite> StatusMQ; Loading Loading @@ -97,12 +98,16 @@ struct StreamIn : public IStreamIn { Return<void> createMmapBuffer(int32_t minSizeFrames, createMmapBuffer_cb _hidl_cb) override; Return<void> getMmapPosition(getMmapPosition_cb _hidl_cb) override; static Result getCapturePositionImpl( audio_stream_in_t *stream, uint64_t *frames, uint64_t *time); private: bool mIsClosed; audio_hw_device_t *mDevice; audio_stream_in_t *mStream; sp<Stream> mStreamCommon; sp<StreamMmap<audio_stream_in_t>> mStreamMmap; std::unique_ptr<CommandMQ> mCommandMQ; std::unique_ptr<DataMQ> mDataMQ; std::unique_ptr<StatusMQ> mStatusMQ; EventFlag* mEfGroup; Loading
audio/2.0/default/StreamOut.cpp +63 −23 Original line number Diff line number Diff line Loading @@ -36,6 +36,7 @@ class WriteThread : public Thread { // WriteThread's lifespan never exceeds StreamOut's lifespan. WriteThread(std::atomic<bool>* stop, audio_stream_out_t* stream, StreamOut::CommandMQ* commandMQ, StreamOut::DataMQ* dataMQ, StreamOut::StatusMQ* statusMQ, EventFlag* efGroup, Loading @@ -43,6 +44,7 @@ class WriteThread : public Thread { : Thread(false /*canCallJava*/), mStop(stop), mStream(stream), mCommandMQ(commandMQ), mDataMQ(dataMQ), mStatusMQ(statusMQ), mEfGroup(efGroup), Loading @@ -56,13 +58,19 @@ class WriteThread : public Thread { private: std::atomic<bool>* mStop; audio_stream_out_t* mStream; StreamOut::CommandMQ* mCommandMQ; StreamOut::DataMQ* mDataMQ; StreamOut::StatusMQ* mStatusMQ; EventFlag* mEfGroup; ThreadPriority mThreadPriority; std::unique_ptr<uint8_t[]> mBuffer; IStreamOut::WriteStatus mStatus; bool threadLoop() override; void doGetLatency(); void doGetPresentationPosition(); void doWrite(); }; status_t WriteThread::readyToRun() { Loading @@ -75,6 +83,32 @@ status_t WriteThread::readyToRun() { return OK; } void WriteThread::doWrite() { const size_t availToRead = mDataMQ->availableToRead(); mStatus.retval = Result::OK; mStatus.reply.written = 0; if (mDataMQ->read(&mBuffer[0], availToRead)) { ssize_t writeResult = mStream->write(mStream, &mBuffer[0], availToRead); if (writeResult >= 0) { mStatus.reply.written = writeResult; } else { mStatus.retval = Stream::analyzeStatus("write", writeResult); } } } void WriteThread::doGetPresentationPosition() { mStatus.retval = StreamOut::getPresentationPositionImpl( mStream, &mStatus.reply.presentationPosition.frames, &mStatus.reply.presentationPosition.timeStamp); } void WriteThread::doGetLatency() { mStatus.retval = Result::OK; mStatus.reply.latencyMs = mStream->get_latency(mStream); } bool WriteThread::threadLoop() { // This implementation doesn't return control back to the Thread until it decides to stop, // as the Thread uses mutexes, and this can lead to priority inversion. Loading @@ -86,24 +120,26 @@ bool WriteThread::threadLoop() { if (!(efState & static_cast<uint32_t>(MessageQueueFlagBits::NOT_EMPTY))) { continue; // Nothing to do. } const size_t availToRead = mDataMQ->availableToRead(); IStreamOut::WriteStatus status; status.writeRetval = Result::OK; status.written = 0; if (mDataMQ->read(&mBuffer[0], availToRead)) { ssize_t writeResult = mStream->write(mStream, &mBuffer[0], availToRead); if (writeResult >= 0) { status.written = writeResult; } else { status.writeRetval = Stream::analyzeStatus("write", writeResult); if (!mCommandMQ->read(&mStatus.replyTo)) { continue; // Nothing to do. } switch (mStatus.replyTo) { case IStreamOut::WriteCommand::WRITE: doWrite(); break; case IStreamOut::WriteCommand::GET_PRESENTATION_POSITION: doGetPresentationPosition(); break; case IStreamOut::WriteCommand::GET_LATENCY: doGetLatency(); break; default: ALOGE("Unknown write thread command code %d", mStatus.replyTo); mStatus.retval = Result::NOT_SUPPORTED; break; } status.presentationPositionRetval = status.writeRetval == Result::OK ? StreamOut::getPresentationPositionImpl(mStream, &status.frames, &status.timeStamp) : Result::OK; if (!mStatusMQ->write(&status)) { ALOGW("status message queue write failed"); if (!mStatusMQ->write(&mStatus)) { ALOGE("status message queue write failed"); } mEfGroup->wake(static_cast<uint32_t>(MessageQueueFlagBits::NOT_FULL)); } Loading Loading @@ -259,17 +295,19 @@ Return<void> StreamOut::prepareForWriting( if (mDataMQ) { ALOGE("the client attempts to call prepareForWriting twice"); _hidl_cb(Result::INVALID_STATE, DataMQ::Descriptor(), StatusMQ::Descriptor()); CommandMQ::Descriptor(), DataMQ::Descriptor(), StatusMQ::Descriptor()); return Void(); } std::unique_ptr<CommandMQ> tempCommandMQ(new CommandMQ(1)); std::unique_ptr<DataMQ> tempDataMQ( new DataMQ(frameSize * framesCount, true /* EventFlag */)); std::unique_ptr<StatusMQ> tempStatusMQ(new StatusMQ(1)); if (!tempDataMQ->isValid() || !tempStatusMQ->isValid()) { if (!tempCommandMQ->isValid() || !tempDataMQ->isValid() || !tempStatusMQ->isValid()) { ALOGE_IF(!tempCommandMQ->isValid(), "command MQ is invalid"); ALOGE_IF(!tempDataMQ->isValid(), "data MQ is invalid"); ALOGE_IF(!tempStatusMQ->isValid(), "status MQ is invalid"); _hidl_cb(Result::INVALID_ARGUMENTS, DataMQ::Descriptor(), StatusMQ::Descriptor()); CommandMQ::Descriptor(), DataMQ::Descriptor(), StatusMQ::Descriptor()); return Void(); } // TODO: Remove event flag management once blocking MQ is implemented. b/33815422 Loading @@ -277,7 +315,7 @@ Return<void> StreamOut::prepareForWriting( if (status != OK || !mEfGroup) { ALOGE("failed creating event flag for data MQ: %s", strerror(-status)); _hidl_cb(Result::INVALID_ARGUMENTS, DataMQ::Descriptor(), StatusMQ::Descriptor()); CommandMQ::Descriptor(), DataMQ::Descriptor(), StatusMQ::Descriptor()); return Void(); } Loading @@ -285,6 +323,7 @@ Return<void> StreamOut::prepareForWriting( mWriteThread = new WriteThread( &mStopWriteThread, mStream, tempCommandMQ.get(), tempDataMQ.get(), tempStatusMQ.get(), mEfGroup, Loading @@ -293,13 +332,14 @@ Return<void> StreamOut::prepareForWriting( if (status != OK) { ALOGW("failed to start writer thread: %s", strerror(-status)); _hidl_cb(Result::INVALID_ARGUMENTS, DataMQ::Descriptor(), StatusMQ::Descriptor()); CommandMQ::Descriptor(), DataMQ::Descriptor(), StatusMQ::Descriptor()); return Void(); } mCommandMQ = std::move(tempCommandMQ); mDataMQ = std::move(tempDataMQ); mStatusMQ = std::move(tempStatusMQ); _hidl_cb(Result::OK, *mDataMQ->getDesc(), *mStatusMQ->getDesc()); _hidl_cb(Result::OK, *mCommandMQ->getDesc(), *mDataMQ->getDesc(), *mStatusMQ->getDesc()); return Void(); } Loading