Loading services/oboeservice/AAudioCommandQueue.cpp +23 −1 Original line number Diff line number Diff line Loading @@ -28,6 +28,10 @@ namespace aaudio { aaudio_result_t AAudioCommandQueue::sendCommand(std::shared_ptr<AAudioCommand> command) { { std::scoped_lock<std::mutex> _l(mLock); if (!mRunning) { ALOGE("Tried to send command while it was not running"); return AAUDIO_ERROR_INVALID_STATE; } mCommands.push(command); mWaitWorkCond.notify_one(); } Loading Loading @@ -68,7 +72,7 @@ std::shared_ptr<AAudioCommand> AAudioCommandQueue::waitForCommand(int64_t timeou return !mRunning || !mCommands.empty(); }); } if (!mCommands.empty()) { if (!mCommands.empty() && mRunning) { command = mCommands.front(); mCommands.pop(); } Loading @@ -76,9 +80,27 @@ std::shared_ptr<AAudioCommand> AAudioCommandQueue::waitForCommand(int64_t timeou return command; } void AAudioCommandQueue::startWaiting() { std::scoped_lock<std::mutex> _l(mLock); mRunning = true; } void AAudioCommandQueue::stopWaiting() { std::scoped_lock<std::mutex> _l(mLock); mRunning = false; // Clear all commands in the queue as the command thread is stopped. while (!mCommands.empty()) { auto command = mCommands.front(); mCommands.pop(); std::scoped_lock<std::mutex> _cl(command->lock); // If the command is waiting for result, returns AAUDIO_ERROR_INVALID_STATE // as there is no thread waiting for the command. if (command->isWaitingForReply) { command->result = AAUDIO_ERROR_INVALID_STATE; command->isWaitingForReply = false; command->conditionVariable.notify_one(); } } mWaitWorkCond.notify_one(); } Loading services/oboeservice/AAudioCommandQueue.h +7 −1 Original line number Diff line number Diff line Loading @@ -77,6 +77,12 @@ public: */ std::shared_ptr<AAudioCommand> waitForCommand(int64_t timeoutNanos = -1); /** * Start waiting for commands. Commands can only be pushed into the command queue after it * starts waiting. */ void startWaiting(); /** * Force stop waiting for next command */ Loading @@ -87,7 +93,7 @@ private: std::condition_variable mWaitWorkCond; std::queue<std::shared_ptr<AAudioCommand>> mCommands GUARDED_BY(mLock); bool mRunning GUARDED_BY(mLock) = true; bool mRunning GUARDED_BY(mLock) = false; }; } // namespace aaudio No newline at end of file services/oboeservice/AAudioServiceStreamBase.cpp +45 −46 Original line number Diff line number Diff line Loading @@ -52,7 +52,6 @@ AAudioServiceStreamBase::AAudioServiceStreamBase(AAudioService &audioService) , mAtomicStreamTimestamp() , mAudioService(audioService) { mMmapClient.attributionSource = AttributionSourceState(); mThreadEnabled = true; } AAudioServiceStreamBase::~AAudioServiceStreamBase() { Loading Loading @@ -178,6 +177,7 @@ aaudio_result_t AAudioServiceStreamBase::open(const aaudio::AAudioStreamRequest // Make sure this object does not get deleted before the run() method // can protect it by making a strong pointer. mCommandQueue.startWaiting(); mThreadEnabled = true; incStrong(nullptr); // See run() method. result = mCommandThread.start(this); Loading @@ -188,14 +188,15 @@ aaudio_result_t AAudioServiceStreamBase::open(const aaudio::AAudioStreamRequest return result; error: close(); closeAndClear(); mThreadEnabled = false; mCommandQueue.stopWaiting(); mCommandThread.stop(); return result; } aaudio_result_t AAudioServiceStreamBase::close() { auto command = std::make_shared<AAudioCommand>( CLOSE, nullptr, true /*waitForReply*/, TIMEOUT_NANOS); aaudio_result_t result = mCommandQueue.sendCommand(command); aaudio_result_t result = sendCommand(CLOSE, nullptr, true /*waitForReply*/, TIMEOUT_NANOS); // Stop the command thread as the stream is closed. mThreadEnabled = false; Loading @@ -213,25 +214,7 @@ aaudio_result_t AAudioServiceStreamBase::close_l() { // This will stop the stream, just in case it was not already stopped. stop_l(); aaudio_result_t result = AAUDIO_OK; sp<AAudioServiceEndpoint> endpoint = mServiceEndpointWeak.promote(); if (endpoint == nullptr) { result = AAUDIO_ERROR_INVALID_STATE; } else { endpoint->unregisterStream(this); AAudioEndpointManager &endpointManager = AAudioEndpointManager::getInstance(); endpointManager.closeEndpoint(endpoint); // AAudioService::closeStream() prevents two threads from closing at the same time. mServiceEndpoint.clear(); // endpoint will hold the pointer after this method returns. } setState(AAUDIO_STREAM_STATE_CLOSED); mediametrics::LogItem(mMetricsId) .set(AMEDIAMETRICS_PROP_EVENT, AMEDIAMETRICS_PROP_EVENT_VALUE_CLOSE) .record(); return result; return closeAndClear(); } aaudio_result_t AAudioServiceStreamBase::startDevice() { Loading @@ -250,9 +233,7 @@ aaudio_result_t AAudioServiceStreamBase::startDevice() { * An AAUDIO_SERVICE_EVENT_STARTED will be sent to the client when complete. */ aaudio_result_t AAudioServiceStreamBase::start() { auto command = std::make_shared<AAudioCommand>( START, nullptr, true /*waitForReply*/, TIMEOUT_NANOS); return mCommandQueue.sendCommand(command); return sendCommand(START, nullptr, true /*waitForReply*/, TIMEOUT_NANOS); } aaudio_result_t AAudioServiceStreamBase::start_l() { Loading Loading @@ -300,9 +281,7 @@ error: } aaudio_result_t AAudioServiceStreamBase::pause() { auto command = std::make_shared<AAudioCommand>( PAUSE, nullptr, true /*waitForReply*/, TIMEOUT_NANOS); return mCommandQueue.sendCommand(command); return sendCommand(PAUSE, nullptr, true /*waitForReply*/, TIMEOUT_NANOS); } aaudio_result_t AAudioServiceStreamBase::pause_l() { Loading Loading @@ -338,9 +317,7 @@ aaudio_result_t AAudioServiceStreamBase::pause_l() { } aaudio_result_t AAudioServiceStreamBase::stop() { auto command = std::make_shared<AAudioCommand>( STOP, nullptr, true /*waitForReply*/, TIMEOUT_NANOS); return mCommandQueue.sendCommand(command); return sendCommand(STOP, nullptr, true /*waitForReply*/, TIMEOUT_NANOS); } aaudio_result_t AAudioServiceStreamBase::stop_l() { Loading Loading @@ -385,9 +362,7 @@ aaudio_result_t AAudioServiceStreamBase::stop_l() { } aaudio_result_t AAudioServiceStreamBase::flush() { auto command = std::make_shared<AAudioCommand>( FLUSH, nullptr, true /*waitForReply*/, TIMEOUT_NANOS); return mCommandQueue.sendCommand(command); return sendCommand(FLUSH, nullptr, true /*waitForReply*/, TIMEOUT_NANOS); } aaudio_result_t AAudioServiceStreamBase::flush_l() { Loading Loading @@ -514,8 +489,7 @@ void AAudioServiceStreamBase::run() { } void AAudioServiceStreamBase::disconnect() { auto command = std::make_shared<AAudioCommand>(DISCONNECT); mCommandQueue.sendCommand(command); sendCommand(DISCONNECT); } void AAudioServiceStreamBase::disconnect_l() { Loading @@ -533,12 +507,10 @@ void AAudioServiceStreamBase::disconnect_l() { aaudio_result_t AAudioServiceStreamBase::registerAudioThread(pid_t clientThreadId, int priority) { const pid_t ownerPid = IPCThreadState::self()->getCallingPid(); // TODO review auto command = std::make_shared<AAudioCommand>( REGISTER_AUDIO_THREAD, return sendCommand(REGISTER_AUDIO_THREAD, std::make_shared<RegisterAudioThreadParam>(ownerPid, clientThreadId, priority), true /*waitForReply*/, TIMEOUT_NANOS); return mCommandQueue.sendCommand(command); } aaudio_result_t AAudioServiceStreamBase::registerAudioThread_l( Loading @@ -561,12 +533,10 @@ aaudio_result_t AAudioServiceStreamBase::registerAudioThread_l( } aaudio_result_t AAudioServiceStreamBase::unregisterAudioThread(pid_t clientThreadId) { auto command = std::make_shared<AAudioCommand>( UNREGISTER_AUDIO_THREAD, return sendCommand(UNREGISTER_AUDIO_THREAD, std::make_shared<UnregisterAudioThreadParam>(clientThreadId), true /*waitForReply*/, TIMEOUT_NANOS); return mCommandQueue.sendCommand(command); } aaudio_result_t AAudioServiceStreamBase::unregisterAudioThread_l(pid_t clientThreadId) { Loading Loading @@ -682,12 +652,11 @@ aaudio_result_t AAudioServiceStreamBase::sendCurrentTimestamp_l() { * used to communicate with the underlying HAL or Service. */ aaudio_result_t AAudioServiceStreamBase::getDescription(AudioEndpointParcelable &parcelable) { auto command = std::make_shared<AAudioCommand>( return sendCommand( GET_DESCRIPTION, std::make_shared<GetDescriptionParam>(&parcelable), true /*waitForReply*/, TIMEOUT_NANOS); return mCommandQueue.sendCommand(command); } aaudio_result_t AAudioServiceStreamBase::getDescription_l(AudioEndpointParcelable* parcelable) { Loading @@ -707,3 +676,33 @@ aaudio_result_t AAudioServiceStreamBase::getDescription_l(AudioEndpointParcelabl void AAudioServiceStreamBase::onVolumeChanged(float volume) { sendServiceEvent(AAUDIO_SERVICE_EVENT_VOLUME, volume); } aaudio_result_t AAudioServiceStreamBase::sendCommand(aaudio_command_opcode opCode, std::shared_ptr<AAudioCommandParam> param, bool waitForReply, int64_t timeoutNanos) { return mCommandQueue.sendCommand(std::make_shared<AAudioCommand>( opCode, param, waitForReply, timeoutNanos)); } aaudio_result_t AAudioServiceStreamBase::closeAndClear() { aaudio_result_t result = AAUDIO_OK; sp<AAudioServiceEndpoint> endpoint = mServiceEndpointWeak.promote(); if (endpoint == nullptr) { result = AAUDIO_ERROR_INVALID_STATE; } else { endpoint->unregisterStream(this); AAudioEndpointManager &endpointManager = AAudioEndpointManager::getInstance(); endpointManager.closeEndpoint(endpoint); // AAudioService::closeStream() prevents two threads from closing at the same time. mServiceEndpoint.clear(); // endpoint will hold the pointer after this method returns. } setState(AAUDIO_STREAM_STATE_CLOSED); mediametrics::LogItem(mMetricsId) .set(AMEDIAMETRICS_PROP_EVENT, AMEDIAMETRICS_PROP_EVENT_VALUE_CLOSE) .record(); return result; } services/oboeservice/AAudioServiceStreamBase.h +7 −0 Original line number Diff line number Diff line Loading @@ -366,6 +366,13 @@ private: aaudio_result_t sendServiceEvent(aaudio_service_event_t event, double dataDouble); aaudio_result_t sendCommand(aaudio_command_opcode opCode, std::shared_ptr<AAudioCommandParam> param = nullptr, bool waitForReply = false, int64_t timeoutNanos = 0); aaudio_result_t closeAndClear(); /** * @return true if the queue is getting full. */ Loading Loading
services/oboeservice/AAudioCommandQueue.cpp +23 −1 Original line number Diff line number Diff line Loading @@ -28,6 +28,10 @@ namespace aaudio { aaudio_result_t AAudioCommandQueue::sendCommand(std::shared_ptr<AAudioCommand> command) { { std::scoped_lock<std::mutex> _l(mLock); if (!mRunning) { ALOGE("Tried to send command while it was not running"); return AAUDIO_ERROR_INVALID_STATE; } mCommands.push(command); mWaitWorkCond.notify_one(); } Loading Loading @@ -68,7 +72,7 @@ std::shared_ptr<AAudioCommand> AAudioCommandQueue::waitForCommand(int64_t timeou return !mRunning || !mCommands.empty(); }); } if (!mCommands.empty()) { if (!mCommands.empty() && mRunning) { command = mCommands.front(); mCommands.pop(); } Loading @@ -76,9 +80,27 @@ std::shared_ptr<AAudioCommand> AAudioCommandQueue::waitForCommand(int64_t timeou return command; } void AAudioCommandQueue::startWaiting() { std::scoped_lock<std::mutex> _l(mLock); mRunning = true; } void AAudioCommandQueue::stopWaiting() { std::scoped_lock<std::mutex> _l(mLock); mRunning = false; // Clear all commands in the queue as the command thread is stopped. while (!mCommands.empty()) { auto command = mCommands.front(); mCommands.pop(); std::scoped_lock<std::mutex> _cl(command->lock); // If the command is waiting for result, returns AAUDIO_ERROR_INVALID_STATE // as there is no thread waiting for the command. if (command->isWaitingForReply) { command->result = AAUDIO_ERROR_INVALID_STATE; command->isWaitingForReply = false; command->conditionVariable.notify_one(); } } mWaitWorkCond.notify_one(); } Loading
services/oboeservice/AAudioCommandQueue.h +7 −1 Original line number Diff line number Diff line Loading @@ -77,6 +77,12 @@ public: */ std::shared_ptr<AAudioCommand> waitForCommand(int64_t timeoutNanos = -1); /** * Start waiting for commands. Commands can only be pushed into the command queue after it * starts waiting. */ void startWaiting(); /** * Force stop waiting for next command */ Loading @@ -87,7 +93,7 @@ private: std::condition_variable mWaitWorkCond; std::queue<std::shared_ptr<AAudioCommand>> mCommands GUARDED_BY(mLock); bool mRunning GUARDED_BY(mLock) = true; bool mRunning GUARDED_BY(mLock) = false; }; } // namespace aaudio No newline at end of file
services/oboeservice/AAudioServiceStreamBase.cpp +45 −46 Original line number Diff line number Diff line Loading @@ -52,7 +52,6 @@ AAudioServiceStreamBase::AAudioServiceStreamBase(AAudioService &audioService) , mAtomicStreamTimestamp() , mAudioService(audioService) { mMmapClient.attributionSource = AttributionSourceState(); mThreadEnabled = true; } AAudioServiceStreamBase::~AAudioServiceStreamBase() { Loading Loading @@ -178,6 +177,7 @@ aaudio_result_t AAudioServiceStreamBase::open(const aaudio::AAudioStreamRequest // Make sure this object does not get deleted before the run() method // can protect it by making a strong pointer. mCommandQueue.startWaiting(); mThreadEnabled = true; incStrong(nullptr); // See run() method. result = mCommandThread.start(this); Loading @@ -188,14 +188,15 @@ aaudio_result_t AAudioServiceStreamBase::open(const aaudio::AAudioStreamRequest return result; error: close(); closeAndClear(); mThreadEnabled = false; mCommandQueue.stopWaiting(); mCommandThread.stop(); return result; } aaudio_result_t AAudioServiceStreamBase::close() { auto command = std::make_shared<AAudioCommand>( CLOSE, nullptr, true /*waitForReply*/, TIMEOUT_NANOS); aaudio_result_t result = mCommandQueue.sendCommand(command); aaudio_result_t result = sendCommand(CLOSE, nullptr, true /*waitForReply*/, TIMEOUT_NANOS); // Stop the command thread as the stream is closed. mThreadEnabled = false; Loading @@ -213,25 +214,7 @@ aaudio_result_t AAudioServiceStreamBase::close_l() { // This will stop the stream, just in case it was not already stopped. stop_l(); aaudio_result_t result = AAUDIO_OK; sp<AAudioServiceEndpoint> endpoint = mServiceEndpointWeak.promote(); if (endpoint == nullptr) { result = AAUDIO_ERROR_INVALID_STATE; } else { endpoint->unregisterStream(this); AAudioEndpointManager &endpointManager = AAudioEndpointManager::getInstance(); endpointManager.closeEndpoint(endpoint); // AAudioService::closeStream() prevents two threads from closing at the same time. mServiceEndpoint.clear(); // endpoint will hold the pointer after this method returns. } setState(AAUDIO_STREAM_STATE_CLOSED); mediametrics::LogItem(mMetricsId) .set(AMEDIAMETRICS_PROP_EVENT, AMEDIAMETRICS_PROP_EVENT_VALUE_CLOSE) .record(); return result; return closeAndClear(); } aaudio_result_t AAudioServiceStreamBase::startDevice() { Loading @@ -250,9 +233,7 @@ aaudio_result_t AAudioServiceStreamBase::startDevice() { * An AAUDIO_SERVICE_EVENT_STARTED will be sent to the client when complete. */ aaudio_result_t AAudioServiceStreamBase::start() { auto command = std::make_shared<AAudioCommand>( START, nullptr, true /*waitForReply*/, TIMEOUT_NANOS); return mCommandQueue.sendCommand(command); return sendCommand(START, nullptr, true /*waitForReply*/, TIMEOUT_NANOS); } aaudio_result_t AAudioServiceStreamBase::start_l() { Loading Loading @@ -300,9 +281,7 @@ error: } aaudio_result_t AAudioServiceStreamBase::pause() { auto command = std::make_shared<AAudioCommand>( PAUSE, nullptr, true /*waitForReply*/, TIMEOUT_NANOS); return mCommandQueue.sendCommand(command); return sendCommand(PAUSE, nullptr, true /*waitForReply*/, TIMEOUT_NANOS); } aaudio_result_t AAudioServiceStreamBase::pause_l() { Loading Loading @@ -338,9 +317,7 @@ aaudio_result_t AAudioServiceStreamBase::pause_l() { } aaudio_result_t AAudioServiceStreamBase::stop() { auto command = std::make_shared<AAudioCommand>( STOP, nullptr, true /*waitForReply*/, TIMEOUT_NANOS); return mCommandQueue.sendCommand(command); return sendCommand(STOP, nullptr, true /*waitForReply*/, TIMEOUT_NANOS); } aaudio_result_t AAudioServiceStreamBase::stop_l() { Loading Loading @@ -385,9 +362,7 @@ aaudio_result_t AAudioServiceStreamBase::stop_l() { } aaudio_result_t AAudioServiceStreamBase::flush() { auto command = std::make_shared<AAudioCommand>( FLUSH, nullptr, true /*waitForReply*/, TIMEOUT_NANOS); return mCommandQueue.sendCommand(command); return sendCommand(FLUSH, nullptr, true /*waitForReply*/, TIMEOUT_NANOS); } aaudio_result_t AAudioServiceStreamBase::flush_l() { Loading Loading @@ -514,8 +489,7 @@ void AAudioServiceStreamBase::run() { } void AAudioServiceStreamBase::disconnect() { auto command = std::make_shared<AAudioCommand>(DISCONNECT); mCommandQueue.sendCommand(command); sendCommand(DISCONNECT); } void AAudioServiceStreamBase::disconnect_l() { Loading @@ -533,12 +507,10 @@ void AAudioServiceStreamBase::disconnect_l() { aaudio_result_t AAudioServiceStreamBase::registerAudioThread(pid_t clientThreadId, int priority) { const pid_t ownerPid = IPCThreadState::self()->getCallingPid(); // TODO review auto command = std::make_shared<AAudioCommand>( REGISTER_AUDIO_THREAD, return sendCommand(REGISTER_AUDIO_THREAD, std::make_shared<RegisterAudioThreadParam>(ownerPid, clientThreadId, priority), true /*waitForReply*/, TIMEOUT_NANOS); return mCommandQueue.sendCommand(command); } aaudio_result_t AAudioServiceStreamBase::registerAudioThread_l( Loading @@ -561,12 +533,10 @@ aaudio_result_t AAudioServiceStreamBase::registerAudioThread_l( } aaudio_result_t AAudioServiceStreamBase::unregisterAudioThread(pid_t clientThreadId) { auto command = std::make_shared<AAudioCommand>( UNREGISTER_AUDIO_THREAD, return sendCommand(UNREGISTER_AUDIO_THREAD, std::make_shared<UnregisterAudioThreadParam>(clientThreadId), true /*waitForReply*/, TIMEOUT_NANOS); return mCommandQueue.sendCommand(command); } aaudio_result_t AAudioServiceStreamBase::unregisterAudioThread_l(pid_t clientThreadId) { Loading Loading @@ -682,12 +652,11 @@ aaudio_result_t AAudioServiceStreamBase::sendCurrentTimestamp_l() { * used to communicate with the underlying HAL or Service. */ aaudio_result_t AAudioServiceStreamBase::getDescription(AudioEndpointParcelable &parcelable) { auto command = std::make_shared<AAudioCommand>( return sendCommand( GET_DESCRIPTION, std::make_shared<GetDescriptionParam>(&parcelable), true /*waitForReply*/, TIMEOUT_NANOS); return mCommandQueue.sendCommand(command); } aaudio_result_t AAudioServiceStreamBase::getDescription_l(AudioEndpointParcelable* parcelable) { Loading @@ -707,3 +676,33 @@ aaudio_result_t AAudioServiceStreamBase::getDescription_l(AudioEndpointParcelabl void AAudioServiceStreamBase::onVolumeChanged(float volume) { sendServiceEvent(AAUDIO_SERVICE_EVENT_VOLUME, volume); } aaudio_result_t AAudioServiceStreamBase::sendCommand(aaudio_command_opcode opCode, std::shared_ptr<AAudioCommandParam> param, bool waitForReply, int64_t timeoutNanos) { return mCommandQueue.sendCommand(std::make_shared<AAudioCommand>( opCode, param, waitForReply, timeoutNanos)); } aaudio_result_t AAudioServiceStreamBase::closeAndClear() { aaudio_result_t result = AAUDIO_OK; sp<AAudioServiceEndpoint> endpoint = mServiceEndpointWeak.promote(); if (endpoint == nullptr) { result = AAUDIO_ERROR_INVALID_STATE; } else { endpoint->unregisterStream(this); AAudioEndpointManager &endpointManager = AAudioEndpointManager::getInstance(); endpointManager.closeEndpoint(endpoint); // AAudioService::closeStream() prevents two threads from closing at the same time. mServiceEndpoint.clear(); // endpoint will hold the pointer after this method returns. } setState(AAUDIO_STREAM_STATE_CLOSED); mediametrics::LogItem(mMetricsId) .set(AMEDIAMETRICS_PROP_EVENT, AMEDIAMETRICS_PROP_EVENT_VALUE_CLOSE) .record(); return result; }
services/oboeservice/AAudioServiceStreamBase.h +7 −0 Original line number Diff line number Diff line Loading @@ -366,6 +366,13 @@ private: aaudio_result_t sendServiceEvent(aaudio_service_event_t event, double dataDouble); aaudio_result_t sendCommand(aaudio_command_opcode opCode, std::shared_ptr<AAudioCommandParam> param = nullptr, bool waitForReply = false, int64_t timeoutNanos = 0); aaudio_result_t closeAndClear(); /** * @return true if the queue is getting full. */ Loading