Loading services/oboeservice/AAudioCommandQueue.cpp 0 → 100644 +85 −0 Original line number Diff line number Diff line /* * Copyright (C) 2021 The Android Open Source Project * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ #define LOG_TAG "AAudioCommandQueue" //#define LOG_NDEBUG 0 #include <chrono> #include <utils/Log.h> #include "AAudioCommandQueue.h" namespace aaudio { aaudio_result_t AAudioCommandQueue::sendCommand(std::shared_ptr<AAudioCommand> command) { { std::scoped_lock<std::mutex> _l(mLock); mCommands.push(command); mWaitWorkCond.notify_one(); } std::unique_lock _cl(command->lock); android::base::ScopedLockAssertion lockAssertion(command->lock); ALOGV("Sending command %d, wait for reply(%d) with timeout %jd", command->operationCode, command->isWaitingForReply, command->timeoutNanoseconds); // `mWaitForReply` is first initialized when the command is constructed. It will be flipped // when the command is completed. auto timeoutExpire = std::chrono::steady_clock::now() + std::chrono::nanoseconds(command->timeoutNanoseconds); while (command->isWaitingForReply) { if (command->conditionVariable.wait_until(_cl, timeoutExpire) == std::cv_status::timeout) { ALOGD("Command %d time out", command->operationCode); command->result = AAUDIO_ERROR_TIMEOUT; command->isWaitingForReply = false; } } ALOGV("Command %d sent with result as %d", command->operationCode, command->result); return command->result; } std::shared_ptr<AAudioCommand> AAudioCommandQueue::waitForCommand(int64_t timeoutNanos) { std::shared_ptr<AAudioCommand> command; { std::unique_lock _l(mLock); android::base::ScopedLockAssertion lockAssertion(mLock); if (timeoutNanos >= 0) { mWaitWorkCond.wait_for(_l, std::chrono::nanoseconds(timeoutNanos), [this]() { android::base::ScopedLockAssertion lockAssertion(mLock); return !mRunning || !mCommands.empty(); }); } else { mWaitWorkCond.wait(_l, [this]() { android::base::ScopedLockAssertion lockAssertion(mLock); return !mRunning || !mCommands.empty(); }); } if (!mCommands.empty()) { command = mCommands.front(); mCommands.pop(); } } return command; } void AAudioCommandQueue::stopWaiting() { std::scoped_lock<std::mutex> _l(mLock); mRunning = false; mWaitWorkCond.notify_one(); } } // namespace aaudio No newline at end of file services/oboeservice/AAudioCommandQueue.h 0 → 100644 +93 −0 Original line number Diff line number Diff line /* * Copyright (C) 2021 The Android Open Source Project * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ #pragma once #include <condition_variable> #include <memory> #include <mutex> #include <queue> #include <aaudio/AAudio.h> #include <android-base/thread_annotations.h> namespace aaudio { typedef int32_t aaudio_command_opcode; class AAudioCommandParam { public: AAudioCommandParam() = default; virtual ~AAudioCommandParam() = default; }; class AAudioCommand { public: explicit AAudioCommand( aaudio_command_opcode opCode, std::shared_ptr<AAudioCommandParam> param = nullptr, bool waitForReply = false, int64_t timeoutNanos = 0) : operationCode(opCode), parameter(param), isWaitingForReply(waitForReply), timeoutNanoseconds(timeoutNanos) { } virtual ~AAudioCommand() = default; std::mutex lock; std::condition_variable conditionVariable; const aaudio_command_opcode operationCode; std::shared_ptr<AAudioCommandParam> parameter; bool isWaitingForReply GUARDED_BY(lock); const int64_t timeoutNanoseconds; aaudio_result_t result GUARDED_BY(lock) = AAUDIO_OK; }; class AAudioCommandQueue { public: AAudioCommandQueue() = default; ~AAudioCommandQueue() = default; /** * Send a command to the command queue. The return will be waiting for a specified timeout * period indicated by the command if it is required. * * @param command the command to send to the command queue. * @return the result of sending the command or the result of executing the command if command * need to wait for a reply. If timeout happens, AAUDIO_ERROR_TIMEOUT will be returned. */ aaudio_result_t sendCommand(std::shared_ptr<AAudioCommand> command); /** * Wait for next available command OR until the timeout is expired. * * @param timeoutNanos the maximum time to wait for next command (0 means return immediately in * any case), negative to wait forever. * @return the next available command if any or a nullptr when there is none. */ std::shared_ptr<AAudioCommand> waitForCommand(int64_t timeoutNanos = -1); /** * Force stop waiting for next command */ void stopWaiting(); private: std::mutex mLock; std::condition_variable mWaitWorkCond; std::queue<std::shared_ptr<AAudioCommand>> mCommands GUARDED_BY(mLock); bool mRunning GUARDED_BY(mLock) = true; }; } // namespace aaudio No newline at end of file services/oboeservice/AAudioServiceEndpointMMAP.cpp +7 −6 Original line number Diff line number Diff line Loading @@ -401,16 +401,17 @@ void AAudioServiceEndpointMMAP::onRoutingChanged(audio_port_handle_t portHandle) /** * Get an immutable description of the data queue from the HAL. */ aaudio_result_t AAudioServiceEndpointMMAP::getDownDataDescription(AudioEndpointParcelable &parcelable) aaudio_result_t AAudioServiceEndpointMMAP::getDownDataDescription( AudioEndpointParcelable* parcelable) { // Gather information on the data queue based on HAL info. int32_t bytesPerFrame = calculateBytesPerFrame(); int32_t capacityInBytes = getBufferCapacity() * bytesPerFrame; int fdIndex = parcelable.addFileDescriptor(mAudioDataFileDescriptor, capacityInBytes); parcelable.mDownDataQueueParcelable.setupMemory(fdIndex, 0, capacityInBytes); parcelable.mDownDataQueueParcelable.setBytesPerFrame(bytesPerFrame); parcelable.mDownDataQueueParcelable.setFramesPerBurst(mFramesPerBurst); parcelable.mDownDataQueueParcelable.setCapacityInFrames(getBufferCapacity()); int fdIndex = parcelable->addFileDescriptor(mAudioDataFileDescriptor, capacityInBytes); parcelable->mDownDataQueueParcelable.setupMemory(fdIndex, 0, capacityInBytes); parcelable->mDownDataQueueParcelable.setBytesPerFrame(bytesPerFrame); parcelable->mDownDataQueueParcelable.setFramesPerBurst(mFramesPerBurst); parcelable->mDownDataQueueParcelable.setCapacityInFrames(getBufferCapacity()); return AAUDIO_OK; } Loading services/oboeservice/AAudioServiceEndpointMMAP.h +1 −1 Original line number Diff line number Diff line Loading @@ -79,7 +79,7 @@ public: void onRoutingChanged(audio_port_handle_t portHandle) override; // ------------------------------------------------------------------------------ aaudio_result_t getDownDataDescription(AudioEndpointParcelable &parcelable); aaudio_result_t getDownDataDescription(AudioEndpointParcelable* parcelable); int64_t getHardwareTimeOffsetNanos() const { return mHardwareTimeOffsetNanos; Loading services/oboeservice/AAudioServiceStreamBase.cpp +173 −79 Original line number Diff line number Diff line Loading @@ -34,23 +34,25 @@ #include "AAudioService.h" #include "AAudioServiceEndpoint.h" #include "AAudioServiceStreamBase.h" #include "TimestampScheduler.h" using namespace android; // TODO just import names needed using namespace aaudio; // TODO just import names needed using content::AttributionSourceState; static const int64_t TIMEOUT_NANOS = 3LL * 1000 * 1000 * 1000; /** * Base class for streams in the service. * @return */ AAudioServiceStreamBase::AAudioServiceStreamBase(AAudioService &audioService) : mTimestampThread("AATime") : mCommandThread("AACommand") , mAtomicStreamTimestamp() , mAudioService(audioService) { mMmapClient.attributionSource = AttributionSourceState(); mThreadEnabled = true; } AAudioServiceStreamBase::~AAudioServiceStreamBase() { Loading @@ -70,6 +72,13 @@ AAudioServiceStreamBase::~AAudioServiceStreamBase() { || getState() == AAUDIO_STREAM_STATE_UNINITIALIZED), "service stream %p still open, state = %d", this, getState()); // Stop the command thread before destroying. if (mThreadEnabled) { mThreadEnabled = false; mCommandQueue.stopWaiting(); mCommandThread.stop(); } } std::string AAudioServiceStreamBase::dumpHeader() { Loading Loading @@ -166,6 +175,16 @@ aaudio_result_t AAudioServiceStreamBase::open(const aaudio::AAudioStreamRequest mFramesPerBurst = mServiceEndpoint->getFramesPerBurst(); copyFrom(*mServiceEndpoint); } // Make sure this object does not get deleted before the run() method // can protect it by making a strong pointer. mThreadEnabled = true; incStrong(nullptr); // See run() method. result = mCommandThread.start(this); if (result != AAUDIO_OK) { decStrong(nullptr); // run() can't do it so we have to do it here. goto error; } return result; error: Loading @@ -174,8 +193,16 @@ error: } aaudio_result_t AAudioServiceStreamBase::close() { std::lock_guard<std::mutex> lock(mLock); return close_l(); auto command = std::make_shared<AAudioCommand>( CLOSE, nullptr, true /*waitForReply*/, TIMEOUT_NANOS); aaudio_result_t result = mCommandQueue.sendCommand(command); // Stop the command thread as the stream is closed. mThreadEnabled = false; mCommandQueue.stopWaiting(); mCommandThread.stop(); return result; } aaudio_result_t AAudioServiceStreamBase::close_l() { Loading @@ -183,8 +210,7 @@ aaudio_result_t AAudioServiceStreamBase::close_l() { return AAUDIO_OK; } // This will call stopTimestampThread() and also stop the stream, // just in case it was not already stopped. // This will stop the stream, just in case it was not already stopped. stop_l(); aaudio_result_t result = AAUDIO_OK; Loading Loading @@ -224,8 +250,12 @@ aaudio_result_t AAudioServiceStreamBase::startDevice() { * An AAUDIO_SERVICE_EVENT_STARTED will be sent to the client when complete. */ aaudio_result_t AAudioServiceStreamBase::start() { std::lock_guard<std::mutex> lock(mLock); auto command = std::make_shared<AAudioCommand>( START, nullptr, true /*waitForReply*/, TIMEOUT_NANOS); return mCommandQueue.sendCommand(command); } aaudio_result_t AAudioServiceStreamBase::start_l() { const int64_t beginNs = AudioClock::getNanoseconds(); aaudio_result_t result = AAUDIO_OK; Loading Loading @@ -261,15 +291,6 @@ aaudio_result_t AAudioServiceStreamBase::start() { // This should happen at the end of the start. sendServiceEvent(AAUDIO_SERVICE_EVENT_STARTED); setState(AAUDIO_STREAM_STATE_STARTED); mThreadEnabled.store(true); // Make sure this object does not get deleted before the run() method // can protect it by making a strong pointer. incStrong(nullptr); // See run() method. result = mTimestampThread.start(this); if (result != AAUDIO_OK) { decStrong(nullptr); // run() can't do it so we have to do it here. goto error; } return result; Loading @@ -279,8 +300,9 @@ error: } aaudio_result_t AAudioServiceStreamBase::pause() { std::lock_guard<std::mutex> lock(mLock); return pause_l(); auto command = std::make_shared<AAudioCommand>( PAUSE, nullptr, true /*waitForReply*/, TIMEOUT_NANOS); return mCommandQueue.sendCommand(command); } aaudio_result_t AAudioServiceStreamBase::pause_l() { Loading @@ -298,12 +320,6 @@ aaudio_result_t AAudioServiceStreamBase::pause_l() { .set(AMEDIAMETRICS_PROP_STATUS, (int32_t)result) .record(); }); result = stopTimestampThread(); if (result != AAUDIO_OK) { disconnect_l(); return result; } sp<AAudioServiceEndpoint> endpoint = mServiceEndpointWeak.promote(); if (endpoint == nullptr) { ALOGE("%s() has no endpoint", __func__); Loading @@ -322,8 +338,9 @@ aaudio_result_t AAudioServiceStreamBase::pause_l() { } aaudio_result_t AAudioServiceStreamBase::stop() { std::lock_guard<std::mutex> lock(mLock); return stop_l(); auto command = std::make_shared<AAudioCommand>( STOP, nullptr, true /*waitForReply*/, TIMEOUT_NANOS); return mCommandQueue.sendCommand(command); } aaudio_result_t AAudioServiceStreamBase::stop_l() { Loading @@ -343,12 +360,6 @@ aaudio_result_t AAudioServiceStreamBase::stop_l() { setState(AAUDIO_STREAM_STATE_STOPPING); // Temporarily unlock because we are joining the timestamp thread and it may try // to acquire mLock. mLock.unlock(); result = stopTimestampThread(); mLock.lock(); if (result != AAUDIO_OK) { disconnect_l(); return result; Loading @@ -373,17 +384,13 @@ aaudio_result_t AAudioServiceStreamBase::stop_l() { return result; } aaudio_result_t AAudioServiceStreamBase::stopTimestampThread() { aaudio_result_t result = AAUDIO_OK; // clear flag that tells thread to loop if (mThreadEnabled.exchange(false)) { result = mTimestampThread.stop(); } return result; aaudio_result_t AAudioServiceStreamBase::flush() { auto command = std::make_shared<AAudioCommand>( FLUSH, nullptr, true /*waitForReply*/, TIMEOUT_NANOS); return mCommandQueue.sendCommand(command); } aaudio_result_t AAudioServiceStreamBase::flush() { std::lock_guard<std::mutex> lock(mLock); aaudio_result_t AAudioServiceStreamBase::flush_l() { aaudio_result_t result = AAudio_isFlushAllowed(getState()); if (result != AAUDIO_OK) { return result; Loading @@ -404,48 +411,111 @@ aaudio_result_t AAudioServiceStreamBase::flush() { return AAUDIO_OK; } // implement Runnable, periodically send timestamps to client // implement Runnable, periodically send timestamps to client and process commands from queue. __attribute__((no_sanitize("integer"))) void AAudioServiceStreamBase::run() { ALOGD("%s() %s entering >>>>>>>>>>>>>> TIMESTAMPS", __func__, getTypeText()); ALOGD("%s() %s entering >>>>>>>>>>>>>> COMMANDS", __func__, getTypeText()); // Hold onto the ref counted stream until the end. android::sp<AAudioServiceStreamBase> holdStream(this); TimestampScheduler timestampScheduler; int64_t nextTime; // Balance the incStrong from when the thread was launched. holdStream->decStrong(nullptr); timestampScheduler.setBurstPeriod(mFramesPerBurst, getSampleRate()); timestampScheduler.start(AudioClock::getNanoseconds()); int64_t nextTime = timestampScheduler.nextAbsoluteTime(); // Taking mLock while starting the thread. All the operation must be able to // run with holding the lock. std::scoped_lock<std::mutex> _l(mLock); int32_t loopCount = 0; aaudio_result_t result = AAUDIO_OK; while (mThreadEnabled.load()) { loopCount++; if (AudioClock::getNanoseconds() >= nextTime) { result = sendCurrentTimestamp(); if (result != AAUDIO_OK) { ALOGE("%s() timestamp thread got result = %d", __func__, result); int64_t timeoutNanos = -1; if (isRunning()) { timeoutNanos = nextTime - AudioClock::getNanoseconds(); timeoutNanos = std::max<int64_t>(0, timeoutNanos); } auto command = mCommandQueue.waitForCommand(timeoutNanos); if (!mThreadEnabled) { // Break the loop if the thread is disabled. break; } nextTime = timestampScheduler.nextAbsoluteTime(); if (isRunning() && AudioClock::getNanoseconds() >= nextTime) { // It is time to update timestamp. if (sendCurrentTimestamp_l() != AAUDIO_OK) { ALOGE("Failed to send current timestamp, stop updating timestamp"); disconnect_l(); } else { // Sleep until it is time to send the next timestamp. // TODO Wait for a signal with a timeout so that we can stop more quickly. AudioClock::sleepUntilNanoTime(nextTime); nextTime = timestampScheduler.nextAbsoluteTime(); } } // This was moved from the calls in stop_l() and pause_l(), which could cause a deadlock // if it resulted in a call to disconnect. if (result == AAUDIO_OK) { (void) sendCurrentTimestamp(); if (command != nullptr) { std::scoped_lock<std::mutex> _commandLock(command->lock); switch (command->operationCode) { case START: command->result = start_l(); timestampScheduler.setBurstPeriod(mFramesPerBurst, getSampleRate()); timestampScheduler.start(AudioClock::getNanoseconds()); nextTime = timestampScheduler.nextAbsoluteTime(); break; case PAUSE: command->result = pause_l(); break; case STOP: command->result = stop_l(); break; case FLUSH: command->result = flush_l(); break; case CLOSE: command->result = close_l(); break; case DISCONNECT: disconnect_l(); break; case REGISTER_AUDIO_THREAD: { RegisterAudioThreadParam *param = (RegisterAudioThreadParam *) command->parameter.get(); command->result = param == nullptr ? AAUDIO_ERROR_ILLEGAL_ARGUMENT : registerAudioThread_l(param->mOwnerPid, param->mClientThreadId, param->mPriority); } ALOGD("%s() %s exiting after %d loops <<<<<<<<<<<<<< TIMESTAMPS", break; case UNREGISTER_AUDIO_THREAD: { UnregisterAudioThreadParam *param = (UnregisterAudioThreadParam *) command->parameter.get(); command->result = param == nullptr ? AAUDIO_ERROR_ILLEGAL_ARGUMENT : unregisterAudioThread_l(param->mClientThreadId); } break; case GET_DESCRIPTION: { GetDescriptionParam *param = (GetDescriptionParam *) command->parameter.get(); command->result = param == nullptr ? AAUDIO_ERROR_ILLEGAL_ARGUMENT : getDescription_l(param->mParcelable); } break; default: ALOGE("Invalid command op code: %d", command->operationCode); break; } if (command->isWaitingForReply) { command->isWaitingForReply = false; command->conditionVariable.notify_one(); } } } ALOGD("%s() %s exiting after %d loops <<<<<<<<<<<<<< COMMANDS", __func__, getTypeText(), loopCount); } void AAudioServiceStreamBase::disconnect() { std::lock_guard<std::mutex> lock(mLock); disconnect_l(); auto command = std::make_shared<AAudioCommand>(DISCONNECT); mCommandQueue.sendCommand(command); } void AAudioServiceStreamBase::disconnect_l() { Loading @@ -461,15 +531,23 @@ void AAudioServiceStreamBase::disconnect_l() { } } aaudio_result_t AAudioServiceStreamBase::registerAudioThread(pid_t clientThreadId, int priority) { std::lock_guard<std::mutex> lock(mLock); aaudio_result_t AAudioServiceStreamBase::registerAudioThread(pid_t clientThreadId, int priority) { const pid_t ownerPid = IPCThreadState::self()->getCallingPid(); // TODO review auto command = std::make_shared<AAudioCommand>( REGISTER_AUDIO_THREAD, std::make_shared<RegisterAudioThreadParam>(ownerPid, clientThreadId, priority), true /*waitForReply*/, TIMEOUT_NANOS); return mCommandQueue.sendCommand(command); } aaudio_result_t AAudioServiceStreamBase::registerAudioThread_l( pid_t ownerPid, pid_t clientThreadId, int priority) { aaudio_result_t result = AAUDIO_OK; if (getRegisteredThread() != AAudioServiceStreamBase::ILLEGAL_THREAD_ID) { ALOGE("AAudioService::registerAudioThread(), thread already registered"); result = AAUDIO_ERROR_INVALID_STATE; } else { const pid_t ownerPid = IPCThreadState::self()->getCallingPid(); // TODO review setRegisteredThread(clientThreadId); int err = android::requestPriority(ownerPid, clientThreadId, priority, true /* isForApp */); Loading @@ -483,7 +561,15 @@ aaudio_result_t AAudioServiceStreamBase::registerAudioThread(pid_t clientThreadI } aaudio_result_t AAudioServiceStreamBase::unregisterAudioThread(pid_t clientThreadId) { std::lock_guard<std::mutex> lock(mLock); auto command = std::make_shared<AAudioCommand>( UNREGISTER_AUDIO_THREAD, std::make_shared<UnregisterAudioThreadParam>(clientThreadId), true /*waitForReply*/, TIMEOUT_NANOS); return mCommandQueue.sendCommand(command); } aaudio_result_t AAudioServiceStreamBase::unregisterAudioThread_l(pid_t clientThreadId) { aaudio_result_t result = AAUDIO_OK; if (getRegisteredThread() != clientThreadId) { ALOGE("%s(), wrong thread", __func__); Loading Loading @@ -552,7 +638,7 @@ aaudio_result_t AAudioServiceStreamBase::sendXRunCount(int32_t xRunCount) { return sendServiceEvent(AAUDIO_SERVICE_EVENT_XRUN, (int64_t) xRunCount); } aaudio_result_t AAudioServiceStreamBase::sendCurrentTimestamp() { aaudio_result_t AAudioServiceStreamBase::sendCurrentTimestamp_l() { AAudioServiceMessage command; // It is not worth filling up the queue with timestamps. // That can cause the stream to get suspended. Loading @@ -562,7 +648,7 @@ aaudio_result_t AAudioServiceStreamBase::sendCurrentTimestamp() { } // Send a timestamp for the clock model. aaudio_result_t result = getFreeRunningPosition(&command.timestamp.position, aaudio_result_t result = getFreeRunningPosition_l(&command.timestamp.position, &command.timestamp.timestamp); if (result == AAUDIO_OK) { ALOGV("%s() SERVICE %8lld at %lld", __func__, Loading @@ -573,7 +659,7 @@ aaudio_result_t AAudioServiceStreamBase::sendCurrentTimestamp() { if (result == AAUDIO_OK) { // Send a hardware timestamp for presentation time. result = getHardwareTimestamp(&command.timestamp.position, result = getHardwareTimestamp_l(&command.timestamp.position, &command.timestamp.timestamp); if (result == AAUDIO_OK) { ALOGV("%s() HARDWARE %8lld at %lld", __func__, Loading @@ -596,7 +682,15 @@ aaudio_result_t AAudioServiceStreamBase::sendCurrentTimestamp() { * used to communicate with the underlying HAL or Service. */ aaudio_result_t AAudioServiceStreamBase::getDescription(AudioEndpointParcelable &parcelable) { std::lock_guard<std::mutex> lock(mLock); auto command = std::make_shared<AAudioCommand>( GET_DESCRIPTION, std::make_shared<GetDescriptionParam>(&parcelable), true /*waitForReply*/, TIMEOUT_NANOS); return mCommandQueue.sendCommand(command); } aaudio_result_t AAudioServiceStreamBase::getDescription_l(AudioEndpointParcelable* parcelable) { { std::lock_guard<std::mutex> lock(mUpMessageQueueLock); if (mUpMessageQueue == nullptr) { Loading @@ -605,9 +699,9 @@ aaudio_result_t AAudioServiceStreamBase::getDescription(AudioEndpointParcelable } // Gather information on the message queue. mUpMessageQueue->fillParcelable(parcelable, parcelable.mUpMessageQueueParcelable); parcelable->mUpMessageQueueParcelable); } return getAudioDataDescription(parcelable); return getAudioDataDescription_l(parcelable); } void AAudioServiceStreamBase::onVolumeChanged(float volume) { Loading Loading
services/oboeservice/AAudioCommandQueue.cpp 0 → 100644 +85 −0 Original line number Diff line number Diff line /* * Copyright (C) 2021 The Android Open Source Project * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ #define LOG_TAG "AAudioCommandQueue" //#define LOG_NDEBUG 0 #include <chrono> #include <utils/Log.h> #include "AAudioCommandQueue.h" namespace aaudio { aaudio_result_t AAudioCommandQueue::sendCommand(std::shared_ptr<AAudioCommand> command) { { std::scoped_lock<std::mutex> _l(mLock); mCommands.push(command); mWaitWorkCond.notify_one(); } std::unique_lock _cl(command->lock); android::base::ScopedLockAssertion lockAssertion(command->lock); ALOGV("Sending command %d, wait for reply(%d) with timeout %jd", command->operationCode, command->isWaitingForReply, command->timeoutNanoseconds); // `mWaitForReply` is first initialized when the command is constructed. It will be flipped // when the command is completed. auto timeoutExpire = std::chrono::steady_clock::now() + std::chrono::nanoseconds(command->timeoutNanoseconds); while (command->isWaitingForReply) { if (command->conditionVariable.wait_until(_cl, timeoutExpire) == std::cv_status::timeout) { ALOGD("Command %d time out", command->operationCode); command->result = AAUDIO_ERROR_TIMEOUT; command->isWaitingForReply = false; } } ALOGV("Command %d sent with result as %d", command->operationCode, command->result); return command->result; } std::shared_ptr<AAudioCommand> AAudioCommandQueue::waitForCommand(int64_t timeoutNanos) { std::shared_ptr<AAudioCommand> command; { std::unique_lock _l(mLock); android::base::ScopedLockAssertion lockAssertion(mLock); if (timeoutNanos >= 0) { mWaitWorkCond.wait_for(_l, std::chrono::nanoseconds(timeoutNanos), [this]() { android::base::ScopedLockAssertion lockAssertion(mLock); return !mRunning || !mCommands.empty(); }); } else { mWaitWorkCond.wait(_l, [this]() { android::base::ScopedLockAssertion lockAssertion(mLock); return !mRunning || !mCommands.empty(); }); } if (!mCommands.empty()) { command = mCommands.front(); mCommands.pop(); } } return command; } void AAudioCommandQueue::stopWaiting() { std::scoped_lock<std::mutex> _l(mLock); mRunning = false; mWaitWorkCond.notify_one(); } } // namespace aaudio No newline at end of file
services/oboeservice/AAudioCommandQueue.h 0 → 100644 +93 −0 Original line number Diff line number Diff line /* * Copyright (C) 2021 The Android Open Source Project * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ #pragma once #include <condition_variable> #include <memory> #include <mutex> #include <queue> #include <aaudio/AAudio.h> #include <android-base/thread_annotations.h> namespace aaudio { typedef int32_t aaudio_command_opcode; class AAudioCommandParam { public: AAudioCommandParam() = default; virtual ~AAudioCommandParam() = default; }; class AAudioCommand { public: explicit AAudioCommand( aaudio_command_opcode opCode, std::shared_ptr<AAudioCommandParam> param = nullptr, bool waitForReply = false, int64_t timeoutNanos = 0) : operationCode(opCode), parameter(param), isWaitingForReply(waitForReply), timeoutNanoseconds(timeoutNanos) { } virtual ~AAudioCommand() = default; std::mutex lock; std::condition_variable conditionVariable; const aaudio_command_opcode operationCode; std::shared_ptr<AAudioCommandParam> parameter; bool isWaitingForReply GUARDED_BY(lock); const int64_t timeoutNanoseconds; aaudio_result_t result GUARDED_BY(lock) = AAUDIO_OK; }; class AAudioCommandQueue { public: AAudioCommandQueue() = default; ~AAudioCommandQueue() = default; /** * Send a command to the command queue. The return will be waiting for a specified timeout * period indicated by the command if it is required. * * @param command the command to send to the command queue. * @return the result of sending the command or the result of executing the command if command * need to wait for a reply. If timeout happens, AAUDIO_ERROR_TIMEOUT will be returned. */ aaudio_result_t sendCommand(std::shared_ptr<AAudioCommand> command); /** * Wait for next available command OR until the timeout is expired. * * @param timeoutNanos the maximum time to wait for next command (0 means return immediately in * any case), negative to wait forever. * @return the next available command if any or a nullptr when there is none. */ std::shared_ptr<AAudioCommand> waitForCommand(int64_t timeoutNanos = -1); /** * Force stop waiting for next command */ void stopWaiting(); private: std::mutex mLock; std::condition_variable mWaitWorkCond; std::queue<std::shared_ptr<AAudioCommand>> mCommands GUARDED_BY(mLock); bool mRunning GUARDED_BY(mLock) = true; }; } // namespace aaudio No newline at end of file
services/oboeservice/AAudioServiceEndpointMMAP.cpp +7 −6 Original line number Diff line number Diff line Loading @@ -401,16 +401,17 @@ void AAudioServiceEndpointMMAP::onRoutingChanged(audio_port_handle_t portHandle) /** * Get an immutable description of the data queue from the HAL. */ aaudio_result_t AAudioServiceEndpointMMAP::getDownDataDescription(AudioEndpointParcelable &parcelable) aaudio_result_t AAudioServiceEndpointMMAP::getDownDataDescription( AudioEndpointParcelable* parcelable) { // Gather information on the data queue based on HAL info. int32_t bytesPerFrame = calculateBytesPerFrame(); int32_t capacityInBytes = getBufferCapacity() * bytesPerFrame; int fdIndex = parcelable.addFileDescriptor(mAudioDataFileDescriptor, capacityInBytes); parcelable.mDownDataQueueParcelable.setupMemory(fdIndex, 0, capacityInBytes); parcelable.mDownDataQueueParcelable.setBytesPerFrame(bytesPerFrame); parcelable.mDownDataQueueParcelable.setFramesPerBurst(mFramesPerBurst); parcelable.mDownDataQueueParcelable.setCapacityInFrames(getBufferCapacity()); int fdIndex = parcelable->addFileDescriptor(mAudioDataFileDescriptor, capacityInBytes); parcelable->mDownDataQueueParcelable.setupMemory(fdIndex, 0, capacityInBytes); parcelable->mDownDataQueueParcelable.setBytesPerFrame(bytesPerFrame); parcelable->mDownDataQueueParcelable.setFramesPerBurst(mFramesPerBurst); parcelable->mDownDataQueueParcelable.setCapacityInFrames(getBufferCapacity()); return AAUDIO_OK; } Loading
services/oboeservice/AAudioServiceEndpointMMAP.h +1 −1 Original line number Diff line number Diff line Loading @@ -79,7 +79,7 @@ public: void onRoutingChanged(audio_port_handle_t portHandle) override; // ------------------------------------------------------------------------------ aaudio_result_t getDownDataDescription(AudioEndpointParcelable &parcelable); aaudio_result_t getDownDataDescription(AudioEndpointParcelable* parcelable); int64_t getHardwareTimeOffsetNanos() const { return mHardwareTimeOffsetNanos; Loading
services/oboeservice/AAudioServiceStreamBase.cpp +173 −79 Original line number Diff line number Diff line Loading @@ -34,23 +34,25 @@ #include "AAudioService.h" #include "AAudioServiceEndpoint.h" #include "AAudioServiceStreamBase.h" #include "TimestampScheduler.h" using namespace android; // TODO just import names needed using namespace aaudio; // TODO just import names needed using content::AttributionSourceState; static const int64_t TIMEOUT_NANOS = 3LL * 1000 * 1000 * 1000; /** * Base class for streams in the service. * @return */ AAudioServiceStreamBase::AAudioServiceStreamBase(AAudioService &audioService) : mTimestampThread("AATime") : mCommandThread("AACommand") , mAtomicStreamTimestamp() , mAudioService(audioService) { mMmapClient.attributionSource = AttributionSourceState(); mThreadEnabled = true; } AAudioServiceStreamBase::~AAudioServiceStreamBase() { Loading @@ -70,6 +72,13 @@ AAudioServiceStreamBase::~AAudioServiceStreamBase() { || getState() == AAUDIO_STREAM_STATE_UNINITIALIZED), "service stream %p still open, state = %d", this, getState()); // Stop the command thread before destroying. if (mThreadEnabled) { mThreadEnabled = false; mCommandQueue.stopWaiting(); mCommandThread.stop(); } } std::string AAudioServiceStreamBase::dumpHeader() { Loading Loading @@ -166,6 +175,16 @@ aaudio_result_t AAudioServiceStreamBase::open(const aaudio::AAudioStreamRequest mFramesPerBurst = mServiceEndpoint->getFramesPerBurst(); copyFrom(*mServiceEndpoint); } // Make sure this object does not get deleted before the run() method // can protect it by making a strong pointer. mThreadEnabled = true; incStrong(nullptr); // See run() method. result = mCommandThread.start(this); if (result != AAUDIO_OK) { decStrong(nullptr); // run() can't do it so we have to do it here. goto error; } return result; error: Loading @@ -174,8 +193,16 @@ error: } aaudio_result_t AAudioServiceStreamBase::close() { std::lock_guard<std::mutex> lock(mLock); return close_l(); auto command = std::make_shared<AAudioCommand>( CLOSE, nullptr, true /*waitForReply*/, TIMEOUT_NANOS); aaudio_result_t result = mCommandQueue.sendCommand(command); // Stop the command thread as the stream is closed. mThreadEnabled = false; mCommandQueue.stopWaiting(); mCommandThread.stop(); return result; } aaudio_result_t AAudioServiceStreamBase::close_l() { Loading @@ -183,8 +210,7 @@ aaudio_result_t AAudioServiceStreamBase::close_l() { return AAUDIO_OK; } // This will call stopTimestampThread() and also stop the stream, // just in case it was not already stopped. // This will stop the stream, just in case it was not already stopped. stop_l(); aaudio_result_t result = AAUDIO_OK; Loading Loading @@ -224,8 +250,12 @@ aaudio_result_t AAudioServiceStreamBase::startDevice() { * An AAUDIO_SERVICE_EVENT_STARTED will be sent to the client when complete. */ aaudio_result_t AAudioServiceStreamBase::start() { std::lock_guard<std::mutex> lock(mLock); auto command = std::make_shared<AAudioCommand>( START, nullptr, true /*waitForReply*/, TIMEOUT_NANOS); return mCommandQueue.sendCommand(command); } aaudio_result_t AAudioServiceStreamBase::start_l() { const int64_t beginNs = AudioClock::getNanoseconds(); aaudio_result_t result = AAUDIO_OK; Loading Loading @@ -261,15 +291,6 @@ aaudio_result_t AAudioServiceStreamBase::start() { // This should happen at the end of the start. sendServiceEvent(AAUDIO_SERVICE_EVENT_STARTED); setState(AAUDIO_STREAM_STATE_STARTED); mThreadEnabled.store(true); // Make sure this object does not get deleted before the run() method // can protect it by making a strong pointer. incStrong(nullptr); // See run() method. result = mTimestampThread.start(this); if (result != AAUDIO_OK) { decStrong(nullptr); // run() can't do it so we have to do it here. goto error; } return result; Loading @@ -279,8 +300,9 @@ error: } aaudio_result_t AAudioServiceStreamBase::pause() { std::lock_guard<std::mutex> lock(mLock); return pause_l(); auto command = std::make_shared<AAudioCommand>( PAUSE, nullptr, true /*waitForReply*/, TIMEOUT_NANOS); return mCommandQueue.sendCommand(command); } aaudio_result_t AAudioServiceStreamBase::pause_l() { Loading @@ -298,12 +320,6 @@ aaudio_result_t AAudioServiceStreamBase::pause_l() { .set(AMEDIAMETRICS_PROP_STATUS, (int32_t)result) .record(); }); result = stopTimestampThread(); if (result != AAUDIO_OK) { disconnect_l(); return result; } sp<AAudioServiceEndpoint> endpoint = mServiceEndpointWeak.promote(); if (endpoint == nullptr) { ALOGE("%s() has no endpoint", __func__); Loading @@ -322,8 +338,9 @@ aaudio_result_t AAudioServiceStreamBase::pause_l() { } aaudio_result_t AAudioServiceStreamBase::stop() { std::lock_guard<std::mutex> lock(mLock); return stop_l(); auto command = std::make_shared<AAudioCommand>( STOP, nullptr, true /*waitForReply*/, TIMEOUT_NANOS); return mCommandQueue.sendCommand(command); } aaudio_result_t AAudioServiceStreamBase::stop_l() { Loading @@ -343,12 +360,6 @@ aaudio_result_t AAudioServiceStreamBase::stop_l() { setState(AAUDIO_STREAM_STATE_STOPPING); // Temporarily unlock because we are joining the timestamp thread and it may try // to acquire mLock. mLock.unlock(); result = stopTimestampThread(); mLock.lock(); if (result != AAUDIO_OK) { disconnect_l(); return result; Loading @@ -373,17 +384,13 @@ aaudio_result_t AAudioServiceStreamBase::stop_l() { return result; } aaudio_result_t AAudioServiceStreamBase::stopTimestampThread() { aaudio_result_t result = AAUDIO_OK; // clear flag that tells thread to loop if (mThreadEnabled.exchange(false)) { result = mTimestampThread.stop(); } return result; aaudio_result_t AAudioServiceStreamBase::flush() { auto command = std::make_shared<AAudioCommand>( FLUSH, nullptr, true /*waitForReply*/, TIMEOUT_NANOS); return mCommandQueue.sendCommand(command); } aaudio_result_t AAudioServiceStreamBase::flush() { std::lock_guard<std::mutex> lock(mLock); aaudio_result_t AAudioServiceStreamBase::flush_l() { aaudio_result_t result = AAudio_isFlushAllowed(getState()); if (result != AAUDIO_OK) { return result; Loading @@ -404,48 +411,111 @@ aaudio_result_t AAudioServiceStreamBase::flush() { return AAUDIO_OK; } // implement Runnable, periodically send timestamps to client // implement Runnable, periodically send timestamps to client and process commands from queue. __attribute__((no_sanitize("integer"))) void AAudioServiceStreamBase::run() { ALOGD("%s() %s entering >>>>>>>>>>>>>> TIMESTAMPS", __func__, getTypeText()); ALOGD("%s() %s entering >>>>>>>>>>>>>> COMMANDS", __func__, getTypeText()); // Hold onto the ref counted stream until the end. android::sp<AAudioServiceStreamBase> holdStream(this); TimestampScheduler timestampScheduler; int64_t nextTime; // Balance the incStrong from when the thread was launched. holdStream->decStrong(nullptr); timestampScheduler.setBurstPeriod(mFramesPerBurst, getSampleRate()); timestampScheduler.start(AudioClock::getNanoseconds()); int64_t nextTime = timestampScheduler.nextAbsoluteTime(); // Taking mLock while starting the thread. All the operation must be able to // run with holding the lock. std::scoped_lock<std::mutex> _l(mLock); int32_t loopCount = 0; aaudio_result_t result = AAUDIO_OK; while (mThreadEnabled.load()) { loopCount++; if (AudioClock::getNanoseconds() >= nextTime) { result = sendCurrentTimestamp(); if (result != AAUDIO_OK) { ALOGE("%s() timestamp thread got result = %d", __func__, result); int64_t timeoutNanos = -1; if (isRunning()) { timeoutNanos = nextTime - AudioClock::getNanoseconds(); timeoutNanos = std::max<int64_t>(0, timeoutNanos); } auto command = mCommandQueue.waitForCommand(timeoutNanos); if (!mThreadEnabled) { // Break the loop if the thread is disabled. break; } nextTime = timestampScheduler.nextAbsoluteTime(); if (isRunning() && AudioClock::getNanoseconds() >= nextTime) { // It is time to update timestamp. if (sendCurrentTimestamp_l() != AAUDIO_OK) { ALOGE("Failed to send current timestamp, stop updating timestamp"); disconnect_l(); } else { // Sleep until it is time to send the next timestamp. // TODO Wait for a signal with a timeout so that we can stop more quickly. AudioClock::sleepUntilNanoTime(nextTime); nextTime = timestampScheduler.nextAbsoluteTime(); } } // This was moved from the calls in stop_l() and pause_l(), which could cause a deadlock // if it resulted in a call to disconnect. if (result == AAUDIO_OK) { (void) sendCurrentTimestamp(); if (command != nullptr) { std::scoped_lock<std::mutex> _commandLock(command->lock); switch (command->operationCode) { case START: command->result = start_l(); timestampScheduler.setBurstPeriod(mFramesPerBurst, getSampleRate()); timestampScheduler.start(AudioClock::getNanoseconds()); nextTime = timestampScheduler.nextAbsoluteTime(); break; case PAUSE: command->result = pause_l(); break; case STOP: command->result = stop_l(); break; case FLUSH: command->result = flush_l(); break; case CLOSE: command->result = close_l(); break; case DISCONNECT: disconnect_l(); break; case REGISTER_AUDIO_THREAD: { RegisterAudioThreadParam *param = (RegisterAudioThreadParam *) command->parameter.get(); command->result = param == nullptr ? AAUDIO_ERROR_ILLEGAL_ARGUMENT : registerAudioThread_l(param->mOwnerPid, param->mClientThreadId, param->mPriority); } ALOGD("%s() %s exiting after %d loops <<<<<<<<<<<<<< TIMESTAMPS", break; case UNREGISTER_AUDIO_THREAD: { UnregisterAudioThreadParam *param = (UnregisterAudioThreadParam *) command->parameter.get(); command->result = param == nullptr ? AAUDIO_ERROR_ILLEGAL_ARGUMENT : unregisterAudioThread_l(param->mClientThreadId); } break; case GET_DESCRIPTION: { GetDescriptionParam *param = (GetDescriptionParam *) command->parameter.get(); command->result = param == nullptr ? AAUDIO_ERROR_ILLEGAL_ARGUMENT : getDescription_l(param->mParcelable); } break; default: ALOGE("Invalid command op code: %d", command->operationCode); break; } if (command->isWaitingForReply) { command->isWaitingForReply = false; command->conditionVariable.notify_one(); } } } ALOGD("%s() %s exiting after %d loops <<<<<<<<<<<<<< COMMANDS", __func__, getTypeText(), loopCount); } void AAudioServiceStreamBase::disconnect() { std::lock_guard<std::mutex> lock(mLock); disconnect_l(); auto command = std::make_shared<AAudioCommand>(DISCONNECT); mCommandQueue.sendCommand(command); } void AAudioServiceStreamBase::disconnect_l() { Loading @@ -461,15 +531,23 @@ void AAudioServiceStreamBase::disconnect_l() { } } aaudio_result_t AAudioServiceStreamBase::registerAudioThread(pid_t clientThreadId, int priority) { std::lock_guard<std::mutex> lock(mLock); aaudio_result_t AAudioServiceStreamBase::registerAudioThread(pid_t clientThreadId, int priority) { const pid_t ownerPid = IPCThreadState::self()->getCallingPid(); // TODO review auto command = std::make_shared<AAudioCommand>( REGISTER_AUDIO_THREAD, std::make_shared<RegisterAudioThreadParam>(ownerPid, clientThreadId, priority), true /*waitForReply*/, TIMEOUT_NANOS); return mCommandQueue.sendCommand(command); } aaudio_result_t AAudioServiceStreamBase::registerAudioThread_l( pid_t ownerPid, pid_t clientThreadId, int priority) { aaudio_result_t result = AAUDIO_OK; if (getRegisteredThread() != AAudioServiceStreamBase::ILLEGAL_THREAD_ID) { ALOGE("AAudioService::registerAudioThread(), thread already registered"); result = AAUDIO_ERROR_INVALID_STATE; } else { const pid_t ownerPid = IPCThreadState::self()->getCallingPid(); // TODO review setRegisteredThread(clientThreadId); int err = android::requestPriority(ownerPid, clientThreadId, priority, true /* isForApp */); Loading @@ -483,7 +561,15 @@ aaudio_result_t AAudioServiceStreamBase::registerAudioThread(pid_t clientThreadI } aaudio_result_t AAudioServiceStreamBase::unregisterAudioThread(pid_t clientThreadId) { std::lock_guard<std::mutex> lock(mLock); auto command = std::make_shared<AAudioCommand>( UNREGISTER_AUDIO_THREAD, std::make_shared<UnregisterAudioThreadParam>(clientThreadId), true /*waitForReply*/, TIMEOUT_NANOS); return mCommandQueue.sendCommand(command); } aaudio_result_t AAudioServiceStreamBase::unregisterAudioThread_l(pid_t clientThreadId) { aaudio_result_t result = AAUDIO_OK; if (getRegisteredThread() != clientThreadId) { ALOGE("%s(), wrong thread", __func__); Loading Loading @@ -552,7 +638,7 @@ aaudio_result_t AAudioServiceStreamBase::sendXRunCount(int32_t xRunCount) { return sendServiceEvent(AAUDIO_SERVICE_EVENT_XRUN, (int64_t) xRunCount); } aaudio_result_t AAudioServiceStreamBase::sendCurrentTimestamp() { aaudio_result_t AAudioServiceStreamBase::sendCurrentTimestamp_l() { AAudioServiceMessage command; // It is not worth filling up the queue with timestamps. // That can cause the stream to get suspended. Loading @@ -562,7 +648,7 @@ aaudio_result_t AAudioServiceStreamBase::sendCurrentTimestamp() { } // Send a timestamp for the clock model. aaudio_result_t result = getFreeRunningPosition(&command.timestamp.position, aaudio_result_t result = getFreeRunningPosition_l(&command.timestamp.position, &command.timestamp.timestamp); if (result == AAUDIO_OK) { ALOGV("%s() SERVICE %8lld at %lld", __func__, Loading @@ -573,7 +659,7 @@ aaudio_result_t AAudioServiceStreamBase::sendCurrentTimestamp() { if (result == AAUDIO_OK) { // Send a hardware timestamp for presentation time. result = getHardwareTimestamp(&command.timestamp.position, result = getHardwareTimestamp_l(&command.timestamp.position, &command.timestamp.timestamp); if (result == AAUDIO_OK) { ALOGV("%s() HARDWARE %8lld at %lld", __func__, Loading @@ -596,7 +682,15 @@ aaudio_result_t AAudioServiceStreamBase::sendCurrentTimestamp() { * used to communicate with the underlying HAL or Service. */ aaudio_result_t AAudioServiceStreamBase::getDescription(AudioEndpointParcelable &parcelable) { std::lock_guard<std::mutex> lock(mLock); auto command = std::make_shared<AAudioCommand>( GET_DESCRIPTION, std::make_shared<GetDescriptionParam>(&parcelable), true /*waitForReply*/, TIMEOUT_NANOS); return mCommandQueue.sendCommand(command); } aaudio_result_t AAudioServiceStreamBase::getDescription_l(AudioEndpointParcelable* parcelable) { { std::lock_guard<std::mutex> lock(mUpMessageQueueLock); if (mUpMessageQueue == nullptr) { Loading @@ -605,9 +699,9 @@ aaudio_result_t AAudioServiceStreamBase::getDescription(AudioEndpointParcelable } // Gather information on the message queue. mUpMessageQueue->fillParcelable(parcelable, parcelable.mUpMessageQueueParcelable); parcelable->mUpMessageQueueParcelable); } return getAudioDataDescription(parcelable); return getAudioDataDescription_l(parcelable); } void AAudioServiceStreamBase::onVolumeChanged(float volume) { Loading