Loading media/libmediatranscoding/transcoder/MediaSampleReaderNDK.cpp +5 −0 Original line number Diff line number Diff line Loading @@ -181,6 +181,11 @@ media_status_t MediaSampleReaderNDK::waitForTrack_l(int trackIndex, if (mEosReached) { return AMEDIA_ERROR_END_OF_STREAM; } if (!mEnforceSequentialAccess) { return moveToTrack_l(trackIndex); } return AMEDIA_OK; } Loading media/libmediatranscoding/transcoder/MediaSampleWriter.cpp +18 −14 Original line number Diff line number Diff line Loading @@ -79,7 +79,7 @@ std::shared_ptr<MediaSampleWriter> MediaSampleWriter::Create() { MediaSampleWriter::~MediaSampleWriter() { if (mState == STARTED) { stop(); // Join thread. stop(); } } Loading Loading @@ -169,38 +169,41 @@ bool MediaSampleWriter::start() { } mState = STARTED; mThread = std::thread([this] { media_status_t status = writeSamples(); std::thread([this] { bool wasStopped = false; media_status_t status = writeSamples(&wasStopped); if (auto callbacks = mCallbacks.lock()) { if (wasStopped && status == AMEDIA_OK) { callbacks->onStopped(this); } else { callbacks->onFinished(this, status); } }); } }).detach(); return true; } bool MediaSampleWriter::stop() { void MediaSampleWriter::stop() { { std::scoped_lock lock(mMutex); if (mState != STARTED) { LOG(ERROR) << "Sample writer is not started."; return false; return; } mState = STOPPED; } mSampleSignal.notify_all(); mThread.join(); return true; } media_status_t MediaSampleWriter::writeSamples() { media_status_t MediaSampleWriter::writeSamples(bool* wasStopped) { media_status_t muxerStatus = mMuxer->start(); if (muxerStatus != AMEDIA_OK) { LOG(ERROR) << "Error starting muxer: " << muxerStatus; return muxerStatus; } media_status_t writeStatus = runWriterLoop(); media_status_t writeStatus = runWriterLoop(wasStopped); if (writeStatus != AMEDIA_OK) { LOG(ERROR) << "Error writing samples: " << writeStatus; } Loading @@ -213,7 +216,7 @@ media_status_t MediaSampleWriter::writeSamples() { return writeStatus != AMEDIA_OK ? writeStatus : muxerStatus; } media_status_t MediaSampleWriter::runWriterLoop() NO_THREAD_SAFETY_ANALYSIS { media_status_t MediaSampleWriter::runWriterLoop(bool* wasStopped) NO_THREAD_SAFETY_ANALYSIS { AMediaCodecBufferInfo bufferInfo; int32_t lastProgressUpdate = 0; int trackEosCount = 0; Loading Loading @@ -242,8 +245,9 @@ media_status_t MediaSampleWriter::runWriterLoop() NO_THREAD_SAFETY_ANALYSIS { mSampleSignal.wait(lock); } if (mState != STARTED) { return AMEDIA_ERROR_UNKNOWN; // TODO(lnilsson): Custom error code. if (mState == STOPPED) { *wasStopped = true; return AMEDIA_OK; } auto& topEntry = mSampleQueue.top(); Loading media/libmediatranscoding/transcoder/MediaTrackTranscoder.cpp +22 −19 Original line number Diff line number Diff line Loading @@ -69,41 +69,44 @@ bool MediaTrackTranscoder::start() { LOG(ERROR) << "TrackTranscoder must be configured before started"; return false; } mState = STARTED; std::thread([this] { bool stopped = false; media_status_t status = runTranscodeLoop(&stopped); mTranscodingThread = std::thread([this] { media_status_t status = runTranscodeLoop(); // Output an EOS sample if the transcoder was stopped. if (stopped) { auto sample = std::make_shared<MediaSample>(); sample->info.flags = SAMPLE_FLAG_END_OF_STREAM; onOutputSampleAvailable(sample); } // Notify the client. if (auto callbacks = mTranscoderCallback.lock()) { if (status != AMEDIA_OK) { callbacks->onTrackError(this, status); } else { if (stopped) { callbacks->onTrackStopped(this); } else if (status == AMEDIA_OK) { callbacks->onTrackFinished(this); } else { callbacks->onTrackError(this, status); } } }); }).detach(); mState = STARTED; return true; } bool MediaTrackTranscoder::stop() { void MediaTrackTranscoder::stop(bool stopOnSyncSample) { std::scoped_lock lock{mStateMutex}; if (mState == STARTED) { if (mState == STARTED || (mStopRequest == STOP_ON_SYNC && !stopOnSyncSample)) { mStopRequest = stopOnSyncSample ? STOP_ON_SYNC : STOP_NOW; abortTranscodeLoop(); mMediaSampleReader->setEnforceSequentialAccess(false); mTranscodingThread.join(); { std::scoped_lock lock{mSampleMutex}; mSampleQueue.abort(); // Release any buffered samples. } mState = STOPPED; return true; } else { LOG(WARNING) << "TrackTranscoder must be started before stopped"; } LOG(ERROR) << "TrackTranscoder must be started before stopped"; return false; } void MediaTrackTranscoder::notifyTrackFormatAvailable() { Loading media/libmediatranscoding/transcoder/MediaTranscoder.cpp +148 −53 Original line number Diff line number Diff line Loading @@ -69,38 +69,67 @@ static AMediaFormat* mergeMediaFormats(AMediaFormat* base, AMediaFormat* overlay return format; } void MediaTranscoder::sendCallback(media_status_t status) { // If the transcoder is already cancelled explicitly, don't send any error callbacks. // Tracks and sample writer will report errors for abort. However, currently we can't // tell it apart from real errors. Ideally we still want to report real errors back // to client, as there is a small chance that explicit abort and the real error come // at around the same time, we should report that if abort has a specific error code. // On the other hand, if the transcoder actually finished (status is AMEDIA_OK) at around // the same time of the abort, we should still report the finish back to the client. if (mCancelled && status != AMEDIA_OK) { return; void MediaTranscoder::onThreadFinished(const void* thread, media_status_t threadStatus, bool threadStopped) { LOG(DEBUG) << "Thread " << thread << " finished with status " << threadStatus << " stopped " << threadStopped; // Stop all threads if one reports an error. if (threadStatus != AMEDIA_OK) { requestStop(false /* stopOnSync */); } bool expected = false; if (mCallbackSent.compare_exchange_strong(expected, true)) { if (status == AMEDIA_OK) { mCallbacks->onFinished(this); std::scoped_lock lock{mThreadStateMutex}; // Record the change. mThreadStates[thread] = DONE; if (threadStatus != AMEDIA_OK && mTranscoderStatus == AMEDIA_OK) { mTranscoderStatus = threadStatus; } mTranscoderStopped |= threadStopped; // Check if all threads are done. Note that if all transcoders have stopped but the sample // writer has not yet started, it never will. bool transcodersDone = true; ThreadState sampleWriterState = PENDING; for (const auto& it : mThreadStates) { LOG(DEBUG) << " Thread " << it.first << " state" << it.second; if (it.first == static_cast<const void*>(mSampleWriter.get())) { sampleWriterState = it.second; } else { mCallbacks->onError(this, status); transcodersDone &= (it.second == DONE); } } if (!transcodersDone || sampleWriterState == RUNNING) { return; } // Transcoding is done and the callback to the client has been sent, so tear down the // pipeline but do it asynchronously to avoid deadlocks. If an error occurred, client // should clean up the file. std::thread asyncCancelThread{[self = shared_from_this()] { self->cancel(); }}; asyncCancelThread.detach(); // All done. Send callback asynchronously and wake up threads waiting in cancel/pause. mThreadsDone = true; if (!mCallbackSent) { std::thread asyncNotificationThread{[this, self = shared_from_this(), status = mTranscoderStatus, stopped = mTranscoderStopped] { // If the transcoder was stopped that means a caller is waiting in stop or pause // in which case we don't send a callback. if (status != AMEDIA_OK) { mCallbacks->onError(this, status); } else if (!stopped) { mCallbacks->onFinished(this); } mThreadsDoneSignal.notify_all(); }}; asyncNotificationThread.detach(); mCallbackSent = true; } } void MediaTranscoder::onTrackFormatAvailable(const MediaTrackTranscoder* transcoder) { LOG(INFO) << "TrackTranscoder " << transcoder << " format available."; LOG(DEBUG) << "TrackTranscoder " << transcoder << " format available."; std::scoped_lock lock{mTracksAddedMutex}; const void* sampleWriterPtr = static_cast<const void*>(mSampleWriter.get()); // Ignore duplicate format change. if (mTracksAdded.count(transcoder) > 0) { Loading @@ -111,7 +140,7 @@ void MediaTranscoder::onTrackFormatAvailable(const MediaTrackTranscoder* transco auto consumer = mSampleWriter->addTrack(transcoder->getOutputFormat()); if (consumer == nullptr) { LOG(ERROR) << "Unable to add track to sample writer."; sendCallback(AMEDIA_ERROR_UNKNOWN); onThreadFinished(sampleWriterPtr, AMEDIA_ERROR_UNKNOWN, false /* stopped */); return; } Loading @@ -119,34 +148,57 @@ void MediaTranscoder::onTrackFormatAvailable(const MediaTrackTranscoder* transco mutableTranscoder->setSampleConsumer(consumer); mTracksAdded.insert(transcoder); bool errorStarting = false; if (mTracksAdded.size() == mTrackTranscoders.size()) { // Enable sequential access mode on the sample reader to achieve optimal read performance. // This has to wait until all tracks have delivered their output formats and the sample // writer is started. Otherwise the tracks will not get their output sample queues drained // and the transcoder could hang due to one track running out of buffers and blocking the // other tracks from reading source samples before they could output their formats. std::scoped_lock lock{mThreadStateMutex}; // Don't start the sample writer if a stop already has been requested. if (!mSampleWriterStopped) { if (!mCancelled) { mSampleReader->setEnforceSequentialAccess(true); LOG(INFO) << "Starting sample writer."; bool started = mSampleWriter->start(); if (!started) { LOG(ERROR) << "Unable to start sample writer."; sendCallback(AMEDIA_ERROR_UNKNOWN); } LOG(DEBUG) << "Starting sample writer."; errorStarting = !mSampleWriter->start(); if (!errorStarting) { mThreadStates[sampleWriterPtr] = RUNNING; } } } if (errorStarting) { LOG(ERROR) << "Unable to start sample writer."; onThreadFinished(sampleWriterPtr, AMEDIA_ERROR_UNKNOWN, false /* stopped */); } } void MediaTranscoder::onTrackFinished(const MediaTrackTranscoder* transcoder) { LOG(DEBUG) << "TrackTranscoder " << transcoder << " finished"; onThreadFinished(static_cast<const void*>(transcoder), AMEDIA_OK, false /* stopped */); } void MediaTranscoder::onTrackStopped(const MediaTrackTranscoder* transcoder) { LOG(DEBUG) << "TrackTranscoder " << transcoder << " stopped"; onThreadFinished(static_cast<const void*>(transcoder), AMEDIA_OK, true /* stopped */); } void MediaTranscoder::onTrackError(const MediaTrackTranscoder* transcoder, media_status_t status) { LOG(ERROR) << "TrackTranscoder " << transcoder << " returned error " << status; sendCallback(status); onThreadFinished(static_cast<const void*>(transcoder), status, false /* stopped */); } void MediaTranscoder::onFinished(const MediaSampleWriter* writer, media_status_t status) { LOG(status == AMEDIA_OK ? DEBUG : ERROR) << "Sample writer finished with status " << status; onThreadFinished(static_cast<const void*>(writer), status, false /* stopped */); } void MediaTranscoder::onFinished(const MediaSampleWriter* writer __unused, media_status_t status) { LOG((status != AMEDIA_OK) ? ERROR : DEBUG) << "Sample writer finished with status " << status; sendCallback(status); void MediaTranscoder::onStopped(const MediaSampleWriter* writer) { LOG(DEBUG) << "Sample writer " << writer << " stopped"; onThreadFinished(static_cast<const void*>(writer), AMEDIA_OK, true /* stopped */); } void MediaTranscoder::onProgressUpdate(const MediaSampleWriter* writer __unused, int32_t progress) { Loading Loading @@ -277,6 +329,9 @@ media_status_t MediaTranscoder::configureTrackFormat(size_t trackIndex, AMediaFo return status; } std::scoped_lock lock{mThreadStateMutex}; mThreadStates[static_cast<const void*>(transcoder.get())] = PENDING; mTrackTranscoders.emplace_back(std::move(transcoder)); return AMEDIA_OK; } Loading @@ -301,6 +356,8 @@ media_status_t MediaTranscoder::configureDestination(int fd) { return AMEDIA_ERROR_UNKNOWN; } std::scoped_lock lock{mThreadStateMutex}; mThreadStates[static_cast<const void*>(mSampleWriter.get())] = PENDING; return AMEDIA_OK; } Loading @@ -314,42 +371,80 @@ media_status_t MediaTranscoder::start() { } // Start transcoders bool started = true; { std::scoped_lock lock{mThreadStateMutex}; for (auto& transcoder : mTrackTranscoders) { bool started = transcoder->start(); if (!(started = transcoder->start())) { break; } mThreadStates[static_cast<const void*>(transcoder.get())] = RUNNING; } } if (!started) { LOG(ERROR) << "Unable to start track transcoder."; cancel(); return AMEDIA_ERROR_UNKNOWN; } return AMEDIA_OK; } media_status_t MediaTranscoder::requestStop(bool stopOnSync) { std::scoped_lock lock{mThreadStateMutex}; if (mCancelled) { LOG(DEBUG) << "MediaTranscoder already cancelled"; return AMEDIA_ERROR_UNSUPPORTED; } if (!stopOnSync) { mSampleWriterStopped = true; mSampleWriter->stop(); } mSampleReader->setEnforceSequentialAccess(false); for (auto& transcoder : mTrackTranscoders) { transcoder->stop(stopOnSync); } mCancelled = true; return AMEDIA_OK; } media_status_t MediaTranscoder::pause(std::shared_ptr<ndk::ScopedAParcel>* pausedState) { // TODO: write internal states to parcel. *pausedState = std::shared_ptr<::ndk::ScopedAParcel>(new ::ndk::ScopedAParcel()); return cancel(); void MediaTranscoder::waitForThreads() NO_THREAD_SAFETY_ANALYSIS { std::unique_lock lock{mThreadStateMutex}; while (!mThreadsDone) { mThreadsDoneSignal.wait(lock); } } media_status_t MediaTranscoder::resume() { // TODO: restore internal states from parcel. return start(); media_status_t MediaTranscoder::pause(std::shared_ptr<ndk::ScopedAParcel>* pausedState) { media_status_t status = requestStop(true /* stopOnSync */); if (status != AMEDIA_OK) { return status; } media_status_t MediaTranscoder::cancel() { bool expected = false; if (!mCancelled.compare_exchange_strong(expected, true)) { // Already cancelled. waitForThreads(); // TODO: write internal states to parcel. *pausedState = std::shared_ptr<::ndk::ScopedAParcel>(new ::ndk::ScopedAParcel()); return AMEDIA_OK; } mSampleWriter->stop(); mSampleReader->setEnforceSequentialAccess(false); for (auto& transcoder : mTrackTranscoders) { transcoder->stop(); media_status_t MediaTranscoder::cancel() { media_status_t status = requestStop(false /* stopOnSync */); if (status != AMEDIA_OK) { return status; } waitForThreads(); // TODO: Release transcoders? return AMEDIA_OK; } media_status_t MediaTranscoder::resume() { // TODO: restore internal states from parcel. return start(); } } // namespace android media/libmediatranscoding/transcoder/PassthroughTrackTranscoder.cpp +15 −9 Original line number Diff line number Diff line Loading @@ -93,9 +93,10 @@ media_status_t PassthroughTrackTranscoder::configureDestinationFormat( return AMEDIA_OK; } media_status_t PassthroughTrackTranscoder::runTranscodeLoop() { media_status_t PassthroughTrackTranscoder::runTranscodeLoop(bool* stopped) { MediaSampleInfo info; std::shared_ptr<MediaSample> sample; bool eosReached = false; // Notify the track format as soon as we start. It's same as the source format. notifyTrackFormatAvailable(); Loading @@ -106,18 +107,18 @@ media_status_t PassthroughTrackTranscoder::runTranscodeLoop() { }; // Move samples until EOS is reached or transcoding is stopped. while (!mStopRequested && !mEosFromSource) { while (mStopRequest != STOP_NOW && !eosReached) { media_status_t status = mMediaSampleReader->getSampleInfoForTrack(mTrackIndex, &info); if (status == AMEDIA_OK) { uint8_t* buffer = mBufferPool->getBufferWithSize(info.size); if (buffer == nullptr) { if (mStopRequested) { if (mStopRequest == STOP_NOW) { break; } LOG(ERROR) << "Unable to get buffer from pool"; return AMEDIA_ERROR_IO; // TODO: Custom error codes? return AMEDIA_ERROR_UNKNOWN; } sample = MediaSample::createWithReleaseCallback( Loading @@ -131,7 +132,7 @@ media_status_t PassthroughTrackTranscoder::runTranscodeLoop() { } else if (status == AMEDIA_ERROR_END_OF_STREAM) { sample = std::make_shared<MediaSample>(); mEosFromSource = true; eosReached = true; } else { LOG(ERROR) << "Unable to get next sample info. Aborting transcode."; return status; Loading @@ -139,18 +140,23 @@ media_status_t PassthroughTrackTranscoder::runTranscodeLoop() { sample->info = info; onOutputSampleAvailable(sample); if (mStopRequest == STOP_ON_SYNC && info.flags & SAMPLE_FLAG_SYNC_SAMPLE) { break; } } if (mStopRequested && !mEosFromSource) { return AMEDIA_ERROR_UNKNOWN; // TODO: Custom error codes? if (mStopRequest != NONE && !eosReached) { *stopped = true; } return AMEDIA_OK; } void PassthroughTrackTranscoder::abortTranscodeLoop() { mStopRequested = true; if (mStopRequest == STOP_NOW) { mBufferPool->abort(); } } std::shared_ptr<AMediaFormat> PassthroughTrackTranscoder::getOutputFormat() const { return mSourceFormat; Loading Loading
media/libmediatranscoding/transcoder/MediaSampleReaderNDK.cpp +5 −0 Original line number Diff line number Diff line Loading @@ -181,6 +181,11 @@ media_status_t MediaSampleReaderNDK::waitForTrack_l(int trackIndex, if (mEosReached) { return AMEDIA_ERROR_END_OF_STREAM; } if (!mEnforceSequentialAccess) { return moveToTrack_l(trackIndex); } return AMEDIA_OK; } Loading
media/libmediatranscoding/transcoder/MediaSampleWriter.cpp +18 −14 Original line number Diff line number Diff line Loading @@ -79,7 +79,7 @@ std::shared_ptr<MediaSampleWriter> MediaSampleWriter::Create() { MediaSampleWriter::~MediaSampleWriter() { if (mState == STARTED) { stop(); // Join thread. stop(); } } Loading Loading @@ -169,38 +169,41 @@ bool MediaSampleWriter::start() { } mState = STARTED; mThread = std::thread([this] { media_status_t status = writeSamples(); std::thread([this] { bool wasStopped = false; media_status_t status = writeSamples(&wasStopped); if (auto callbacks = mCallbacks.lock()) { if (wasStopped && status == AMEDIA_OK) { callbacks->onStopped(this); } else { callbacks->onFinished(this, status); } }); } }).detach(); return true; } bool MediaSampleWriter::stop() { void MediaSampleWriter::stop() { { std::scoped_lock lock(mMutex); if (mState != STARTED) { LOG(ERROR) << "Sample writer is not started."; return false; return; } mState = STOPPED; } mSampleSignal.notify_all(); mThread.join(); return true; } media_status_t MediaSampleWriter::writeSamples() { media_status_t MediaSampleWriter::writeSamples(bool* wasStopped) { media_status_t muxerStatus = mMuxer->start(); if (muxerStatus != AMEDIA_OK) { LOG(ERROR) << "Error starting muxer: " << muxerStatus; return muxerStatus; } media_status_t writeStatus = runWriterLoop(); media_status_t writeStatus = runWriterLoop(wasStopped); if (writeStatus != AMEDIA_OK) { LOG(ERROR) << "Error writing samples: " << writeStatus; } Loading @@ -213,7 +216,7 @@ media_status_t MediaSampleWriter::writeSamples() { return writeStatus != AMEDIA_OK ? writeStatus : muxerStatus; } media_status_t MediaSampleWriter::runWriterLoop() NO_THREAD_SAFETY_ANALYSIS { media_status_t MediaSampleWriter::runWriterLoop(bool* wasStopped) NO_THREAD_SAFETY_ANALYSIS { AMediaCodecBufferInfo bufferInfo; int32_t lastProgressUpdate = 0; int trackEosCount = 0; Loading Loading @@ -242,8 +245,9 @@ media_status_t MediaSampleWriter::runWriterLoop() NO_THREAD_SAFETY_ANALYSIS { mSampleSignal.wait(lock); } if (mState != STARTED) { return AMEDIA_ERROR_UNKNOWN; // TODO(lnilsson): Custom error code. if (mState == STOPPED) { *wasStopped = true; return AMEDIA_OK; } auto& topEntry = mSampleQueue.top(); Loading
media/libmediatranscoding/transcoder/MediaTrackTranscoder.cpp +22 −19 Original line number Diff line number Diff line Loading @@ -69,41 +69,44 @@ bool MediaTrackTranscoder::start() { LOG(ERROR) << "TrackTranscoder must be configured before started"; return false; } mState = STARTED; std::thread([this] { bool stopped = false; media_status_t status = runTranscodeLoop(&stopped); mTranscodingThread = std::thread([this] { media_status_t status = runTranscodeLoop(); // Output an EOS sample if the transcoder was stopped. if (stopped) { auto sample = std::make_shared<MediaSample>(); sample->info.flags = SAMPLE_FLAG_END_OF_STREAM; onOutputSampleAvailable(sample); } // Notify the client. if (auto callbacks = mTranscoderCallback.lock()) { if (status != AMEDIA_OK) { callbacks->onTrackError(this, status); } else { if (stopped) { callbacks->onTrackStopped(this); } else if (status == AMEDIA_OK) { callbacks->onTrackFinished(this); } else { callbacks->onTrackError(this, status); } } }); }).detach(); mState = STARTED; return true; } bool MediaTrackTranscoder::stop() { void MediaTrackTranscoder::stop(bool stopOnSyncSample) { std::scoped_lock lock{mStateMutex}; if (mState == STARTED) { if (mState == STARTED || (mStopRequest == STOP_ON_SYNC && !stopOnSyncSample)) { mStopRequest = stopOnSyncSample ? STOP_ON_SYNC : STOP_NOW; abortTranscodeLoop(); mMediaSampleReader->setEnforceSequentialAccess(false); mTranscodingThread.join(); { std::scoped_lock lock{mSampleMutex}; mSampleQueue.abort(); // Release any buffered samples. } mState = STOPPED; return true; } else { LOG(WARNING) << "TrackTranscoder must be started before stopped"; } LOG(ERROR) << "TrackTranscoder must be started before stopped"; return false; } void MediaTrackTranscoder::notifyTrackFormatAvailable() { Loading
media/libmediatranscoding/transcoder/MediaTranscoder.cpp +148 −53 Original line number Diff line number Diff line Loading @@ -69,38 +69,67 @@ static AMediaFormat* mergeMediaFormats(AMediaFormat* base, AMediaFormat* overlay return format; } void MediaTranscoder::sendCallback(media_status_t status) { // If the transcoder is already cancelled explicitly, don't send any error callbacks. // Tracks and sample writer will report errors for abort. However, currently we can't // tell it apart from real errors. Ideally we still want to report real errors back // to client, as there is a small chance that explicit abort and the real error come // at around the same time, we should report that if abort has a specific error code. // On the other hand, if the transcoder actually finished (status is AMEDIA_OK) at around // the same time of the abort, we should still report the finish back to the client. if (mCancelled && status != AMEDIA_OK) { return; void MediaTranscoder::onThreadFinished(const void* thread, media_status_t threadStatus, bool threadStopped) { LOG(DEBUG) << "Thread " << thread << " finished with status " << threadStatus << " stopped " << threadStopped; // Stop all threads if one reports an error. if (threadStatus != AMEDIA_OK) { requestStop(false /* stopOnSync */); } bool expected = false; if (mCallbackSent.compare_exchange_strong(expected, true)) { if (status == AMEDIA_OK) { mCallbacks->onFinished(this); std::scoped_lock lock{mThreadStateMutex}; // Record the change. mThreadStates[thread] = DONE; if (threadStatus != AMEDIA_OK && mTranscoderStatus == AMEDIA_OK) { mTranscoderStatus = threadStatus; } mTranscoderStopped |= threadStopped; // Check if all threads are done. Note that if all transcoders have stopped but the sample // writer has not yet started, it never will. bool transcodersDone = true; ThreadState sampleWriterState = PENDING; for (const auto& it : mThreadStates) { LOG(DEBUG) << " Thread " << it.first << " state" << it.second; if (it.first == static_cast<const void*>(mSampleWriter.get())) { sampleWriterState = it.second; } else { mCallbacks->onError(this, status); transcodersDone &= (it.second == DONE); } } if (!transcodersDone || sampleWriterState == RUNNING) { return; } // Transcoding is done and the callback to the client has been sent, so tear down the // pipeline but do it asynchronously to avoid deadlocks. If an error occurred, client // should clean up the file. std::thread asyncCancelThread{[self = shared_from_this()] { self->cancel(); }}; asyncCancelThread.detach(); // All done. Send callback asynchronously and wake up threads waiting in cancel/pause. mThreadsDone = true; if (!mCallbackSent) { std::thread asyncNotificationThread{[this, self = shared_from_this(), status = mTranscoderStatus, stopped = mTranscoderStopped] { // If the transcoder was stopped that means a caller is waiting in stop or pause // in which case we don't send a callback. if (status != AMEDIA_OK) { mCallbacks->onError(this, status); } else if (!stopped) { mCallbacks->onFinished(this); } mThreadsDoneSignal.notify_all(); }}; asyncNotificationThread.detach(); mCallbackSent = true; } } void MediaTranscoder::onTrackFormatAvailable(const MediaTrackTranscoder* transcoder) { LOG(INFO) << "TrackTranscoder " << transcoder << " format available."; LOG(DEBUG) << "TrackTranscoder " << transcoder << " format available."; std::scoped_lock lock{mTracksAddedMutex}; const void* sampleWriterPtr = static_cast<const void*>(mSampleWriter.get()); // Ignore duplicate format change. if (mTracksAdded.count(transcoder) > 0) { Loading @@ -111,7 +140,7 @@ void MediaTranscoder::onTrackFormatAvailable(const MediaTrackTranscoder* transco auto consumer = mSampleWriter->addTrack(transcoder->getOutputFormat()); if (consumer == nullptr) { LOG(ERROR) << "Unable to add track to sample writer."; sendCallback(AMEDIA_ERROR_UNKNOWN); onThreadFinished(sampleWriterPtr, AMEDIA_ERROR_UNKNOWN, false /* stopped */); return; } Loading @@ -119,34 +148,57 @@ void MediaTranscoder::onTrackFormatAvailable(const MediaTrackTranscoder* transco mutableTranscoder->setSampleConsumer(consumer); mTracksAdded.insert(transcoder); bool errorStarting = false; if (mTracksAdded.size() == mTrackTranscoders.size()) { // Enable sequential access mode on the sample reader to achieve optimal read performance. // This has to wait until all tracks have delivered their output formats and the sample // writer is started. Otherwise the tracks will not get their output sample queues drained // and the transcoder could hang due to one track running out of buffers and blocking the // other tracks from reading source samples before they could output their formats. std::scoped_lock lock{mThreadStateMutex}; // Don't start the sample writer if a stop already has been requested. if (!mSampleWriterStopped) { if (!mCancelled) { mSampleReader->setEnforceSequentialAccess(true); LOG(INFO) << "Starting sample writer."; bool started = mSampleWriter->start(); if (!started) { LOG(ERROR) << "Unable to start sample writer."; sendCallback(AMEDIA_ERROR_UNKNOWN); } LOG(DEBUG) << "Starting sample writer."; errorStarting = !mSampleWriter->start(); if (!errorStarting) { mThreadStates[sampleWriterPtr] = RUNNING; } } } if (errorStarting) { LOG(ERROR) << "Unable to start sample writer."; onThreadFinished(sampleWriterPtr, AMEDIA_ERROR_UNKNOWN, false /* stopped */); } } void MediaTranscoder::onTrackFinished(const MediaTrackTranscoder* transcoder) { LOG(DEBUG) << "TrackTranscoder " << transcoder << " finished"; onThreadFinished(static_cast<const void*>(transcoder), AMEDIA_OK, false /* stopped */); } void MediaTranscoder::onTrackStopped(const MediaTrackTranscoder* transcoder) { LOG(DEBUG) << "TrackTranscoder " << transcoder << " stopped"; onThreadFinished(static_cast<const void*>(transcoder), AMEDIA_OK, true /* stopped */); } void MediaTranscoder::onTrackError(const MediaTrackTranscoder* transcoder, media_status_t status) { LOG(ERROR) << "TrackTranscoder " << transcoder << " returned error " << status; sendCallback(status); onThreadFinished(static_cast<const void*>(transcoder), status, false /* stopped */); } void MediaTranscoder::onFinished(const MediaSampleWriter* writer, media_status_t status) { LOG(status == AMEDIA_OK ? DEBUG : ERROR) << "Sample writer finished with status " << status; onThreadFinished(static_cast<const void*>(writer), status, false /* stopped */); } void MediaTranscoder::onFinished(const MediaSampleWriter* writer __unused, media_status_t status) { LOG((status != AMEDIA_OK) ? ERROR : DEBUG) << "Sample writer finished with status " << status; sendCallback(status); void MediaTranscoder::onStopped(const MediaSampleWriter* writer) { LOG(DEBUG) << "Sample writer " << writer << " stopped"; onThreadFinished(static_cast<const void*>(writer), AMEDIA_OK, true /* stopped */); } void MediaTranscoder::onProgressUpdate(const MediaSampleWriter* writer __unused, int32_t progress) { Loading Loading @@ -277,6 +329,9 @@ media_status_t MediaTranscoder::configureTrackFormat(size_t trackIndex, AMediaFo return status; } std::scoped_lock lock{mThreadStateMutex}; mThreadStates[static_cast<const void*>(transcoder.get())] = PENDING; mTrackTranscoders.emplace_back(std::move(transcoder)); return AMEDIA_OK; } Loading @@ -301,6 +356,8 @@ media_status_t MediaTranscoder::configureDestination(int fd) { return AMEDIA_ERROR_UNKNOWN; } std::scoped_lock lock{mThreadStateMutex}; mThreadStates[static_cast<const void*>(mSampleWriter.get())] = PENDING; return AMEDIA_OK; } Loading @@ -314,42 +371,80 @@ media_status_t MediaTranscoder::start() { } // Start transcoders bool started = true; { std::scoped_lock lock{mThreadStateMutex}; for (auto& transcoder : mTrackTranscoders) { bool started = transcoder->start(); if (!(started = transcoder->start())) { break; } mThreadStates[static_cast<const void*>(transcoder.get())] = RUNNING; } } if (!started) { LOG(ERROR) << "Unable to start track transcoder."; cancel(); return AMEDIA_ERROR_UNKNOWN; } return AMEDIA_OK; } media_status_t MediaTranscoder::requestStop(bool stopOnSync) { std::scoped_lock lock{mThreadStateMutex}; if (mCancelled) { LOG(DEBUG) << "MediaTranscoder already cancelled"; return AMEDIA_ERROR_UNSUPPORTED; } if (!stopOnSync) { mSampleWriterStopped = true; mSampleWriter->stop(); } mSampleReader->setEnforceSequentialAccess(false); for (auto& transcoder : mTrackTranscoders) { transcoder->stop(stopOnSync); } mCancelled = true; return AMEDIA_OK; } media_status_t MediaTranscoder::pause(std::shared_ptr<ndk::ScopedAParcel>* pausedState) { // TODO: write internal states to parcel. *pausedState = std::shared_ptr<::ndk::ScopedAParcel>(new ::ndk::ScopedAParcel()); return cancel(); void MediaTranscoder::waitForThreads() NO_THREAD_SAFETY_ANALYSIS { std::unique_lock lock{mThreadStateMutex}; while (!mThreadsDone) { mThreadsDoneSignal.wait(lock); } } media_status_t MediaTranscoder::resume() { // TODO: restore internal states from parcel. return start(); media_status_t MediaTranscoder::pause(std::shared_ptr<ndk::ScopedAParcel>* pausedState) { media_status_t status = requestStop(true /* stopOnSync */); if (status != AMEDIA_OK) { return status; } media_status_t MediaTranscoder::cancel() { bool expected = false; if (!mCancelled.compare_exchange_strong(expected, true)) { // Already cancelled. waitForThreads(); // TODO: write internal states to parcel. *pausedState = std::shared_ptr<::ndk::ScopedAParcel>(new ::ndk::ScopedAParcel()); return AMEDIA_OK; } mSampleWriter->stop(); mSampleReader->setEnforceSequentialAccess(false); for (auto& transcoder : mTrackTranscoders) { transcoder->stop(); media_status_t MediaTranscoder::cancel() { media_status_t status = requestStop(false /* stopOnSync */); if (status != AMEDIA_OK) { return status; } waitForThreads(); // TODO: Release transcoders? return AMEDIA_OK; } media_status_t MediaTranscoder::resume() { // TODO: restore internal states from parcel. return start(); } } // namespace android
media/libmediatranscoding/transcoder/PassthroughTrackTranscoder.cpp +15 −9 Original line number Diff line number Diff line Loading @@ -93,9 +93,10 @@ media_status_t PassthroughTrackTranscoder::configureDestinationFormat( return AMEDIA_OK; } media_status_t PassthroughTrackTranscoder::runTranscodeLoop() { media_status_t PassthroughTrackTranscoder::runTranscodeLoop(bool* stopped) { MediaSampleInfo info; std::shared_ptr<MediaSample> sample; bool eosReached = false; // Notify the track format as soon as we start. It's same as the source format. notifyTrackFormatAvailable(); Loading @@ -106,18 +107,18 @@ media_status_t PassthroughTrackTranscoder::runTranscodeLoop() { }; // Move samples until EOS is reached or transcoding is stopped. while (!mStopRequested && !mEosFromSource) { while (mStopRequest != STOP_NOW && !eosReached) { media_status_t status = mMediaSampleReader->getSampleInfoForTrack(mTrackIndex, &info); if (status == AMEDIA_OK) { uint8_t* buffer = mBufferPool->getBufferWithSize(info.size); if (buffer == nullptr) { if (mStopRequested) { if (mStopRequest == STOP_NOW) { break; } LOG(ERROR) << "Unable to get buffer from pool"; return AMEDIA_ERROR_IO; // TODO: Custom error codes? return AMEDIA_ERROR_UNKNOWN; } sample = MediaSample::createWithReleaseCallback( Loading @@ -131,7 +132,7 @@ media_status_t PassthroughTrackTranscoder::runTranscodeLoop() { } else if (status == AMEDIA_ERROR_END_OF_STREAM) { sample = std::make_shared<MediaSample>(); mEosFromSource = true; eosReached = true; } else { LOG(ERROR) << "Unable to get next sample info. Aborting transcode."; return status; Loading @@ -139,18 +140,23 @@ media_status_t PassthroughTrackTranscoder::runTranscodeLoop() { sample->info = info; onOutputSampleAvailable(sample); if (mStopRequest == STOP_ON_SYNC && info.flags & SAMPLE_FLAG_SYNC_SAMPLE) { break; } } if (mStopRequested && !mEosFromSource) { return AMEDIA_ERROR_UNKNOWN; // TODO: Custom error codes? if (mStopRequest != NONE && !eosReached) { *stopped = true; } return AMEDIA_OK; } void PassthroughTrackTranscoder::abortTranscodeLoop() { mStopRequested = true; if (mStopRequest == STOP_NOW) { mBufferPool->abort(); } } std::shared_ptr<AMediaFormat> PassthroughTrackTranscoder::getOutputFormat() const { return mSourceFormat; Loading