Donate to e Foundation | Murena handsets with /e/OS | Own a part of Murena! Learn more

Commit fdb3e339 authored by Linus Nilsson's avatar Linus Nilsson
Browse files

Transcoder: Add support for pausing transcoding on a sync frame.

- Added support for stopping transcoders on a sync frame.
- Refactored MediaTrackTranscoders and MediaSampleWriter to stop()
asynchronously.
- Fixed callback and error handling logic in MediaTranscoder.
- Added tests for pause and stopping on sync frame.

Bug: 162886306
Test: Unit tests.

Change-Id: If689a10dfee198c674c4c13b865a7c56a901e075
parent f674100b
Loading
Loading
Loading
Loading
+5 −0
Original line number Diff line number Diff line
@@ -181,6 +181,11 @@ media_status_t MediaSampleReaderNDK::waitForTrack_l(int trackIndex,
    if (mEosReached) {
        return AMEDIA_ERROR_END_OF_STREAM;
    }

    if (!mEnforceSequentialAccess) {
        return moveToTrack_l(trackIndex);
    }

    return AMEDIA_OK;
}

+18 −14
Original line number Diff line number Diff line
@@ -79,7 +79,7 @@ std::shared_ptr<MediaSampleWriter> MediaSampleWriter::Create() {

MediaSampleWriter::~MediaSampleWriter() {
    if (mState == STARTED) {
        stop();  // Join thread.
        stop();
    }
}

@@ -169,38 +169,41 @@ bool MediaSampleWriter::start() {
    }

    mState = STARTED;
    mThread = std::thread([this] {
        media_status_t status = writeSamples();
    std::thread([this] {
        bool wasStopped = false;
        media_status_t status = writeSamples(&wasStopped);
        if (auto callbacks = mCallbacks.lock()) {
            if (wasStopped && status == AMEDIA_OK) {
                callbacks->onStopped(this);
            } else {
                callbacks->onFinished(this, status);
            }
    });
        }
    }).detach();
    return true;
}

bool MediaSampleWriter::stop() {
void MediaSampleWriter::stop() {
    {
        std::scoped_lock lock(mMutex);
        if (mState != STARTED) {
            LOG(ERROR) << "Sample writer is not started.";
            return false;
            return;
        }
        mState = STOPPED;
    }

    mSampleSignal.notify_all();
    mThread.join();
    return true;
}

media_status_t MediaSampleWriter::writeSamples() {
media_status_t MediaSampleWriter::writeSamples(bool* wasStopped) {
    media_status_t muxerStatus = mMuxer->start();
    if (muxerStatus != AMEDIA_OK) {
        LOG(ERROR) << "Error starting muxer: " << muxerStatus;
        return muxerStatus;
    }

    media_status_t writeStatus = runWriterLoop();
    media_status_t writeStatus = runWriterLoop(wasStopped);
    if (writeStatus != AMEDIA_OK) {
        LOG(ERROR) << "Error writing samples: " << writeStatus;
    }
@@ -213,7 +216,7 @@ media_status_t MediaSampleWriter::writeSamples() {
    return writeStatus != AMEDIA_OK ? writeStatus : muxerStatus;
}

media_status_t MediaSampleWriter::runWriterLoop() NO_THREAD_SAFETY_ANALYSIS {
media_status_t MediaSampleWriter::runWriterLoop(bool* wasStopped) NO_THREAD_SAFETY_ANALYSIS {
    AMediaCodecBufferInfo bufferInfo;
    int32_t lastProgressUpdate = 0;
    int trackEosCount = 0;
@@ -242,8 +245,9 @@ media_status_t MediaSampleWriter::runWriterLoop() NO_THREAD_SAFETY_ANALYSIS {
                mSampleSignal.wait(lock);
            }

            if (mState != STARTED) {
                return AMEDIA_ERROR_UNKNOWN;  // TODO(lnilsson): Custom error code.
            if (mState == STOPPED) {
                *wasStopped = true;
                return AMEDIA_OK;
            }

            auto& topEntry = mSampleQueue.top();
+22 −19
Original line number Diff line number Diff line
@@ -69,41 +69,44 @@ bool MediaTrackTranscoder::start() {
        LOG(ERROR) << "TrackTranscoder must be configured before started";
        return false;
    }
    mState = STARTED;

    std::thread([this] {
        bool stopped = false;
        media_status_t status = runTranscodeLoop(&stopped);

    mTranscodingThread = std::thread([this] {
        media_status_t status = runTranscodeLoop();
        // Output an EOS sample if the transcoder was stopped.
        if (stopped) {
            auto sample = std::make_shared<MediaSample>();
            sample->info.flags = SAMPLE_FLAG_END_OF_STREAM;
            onOutputSampleAvailable(sample);
        }

        // Notify the client.
        if (auto callbacks = mTranscoderCallback.lock()) {
            if (status != AMEDIA_OK) {
                callbacks->onTrackError(this, status);
            } else {
            if (stopped) {
                callbacks->onTrackStopped(this);
            } else if (status == AMEDIA_OK) {
                callbacks->onTrackFinished(this);
            } else {
                callbacks->onTrackError(this, status);
            }
        }
    });
    }).detach();

    mState = STARTED;
    return true;
}

bool MediaTrackTranscoder::stop() {
void MediaTrackTranscoder::stop(bool stopOnSyncSample) {
    std::scoped_lock lock{mStateMutex};

    if (mState == STARTED) {
    if (mState == STARTED || (mStopRequest == STOP_ON_SYNC && !stopOnSyncSample)) {
        mStopRequest = stopOnSyncSample ? STOP_ON_SYNC : STOP_NOW;
        abortTranscodeLoop();
        mMediaSampleReader->setEnforceSequentialAccess(false);
        mTranscodingThread.join();
        {
            std::scoped_lock lock{mSampleMutex};
            mSampleQueue.abort();  // Release any buffered samples.
        }
        mState = STOPPED;
        return true;
    } else {
        LOG(WARNING) << "TrackTranscoder must be started before stopped";
    }

    LOG(ERROR) << "TrackTranscoder must be started before stopped";
    return false;
}

void MediaTrackTranscoder::notifyTrackFormatAvailable() {
+148 −53
Original line number Diff line number Diff line
@@ -69,38 +69,67 @@ static AMediaFormat* mergeMediaFormats(AMediaFormat* base, AMediaFormat* overlay
    return format;
}

void MediaTranscoder::sendCallback(media_status_t status) {
    // If the transcoder is already cancelled explicitly, don't send any error callbacks.
    // Tracks and sample writer will report errors for abort. However, currently we can't
    // tell it apart from real errors. Ideally we still want to report real errors back
    // to client, as there is a small chance that explicit abort and the real error come
    // at around the same time, we should report that if abort has a specific error code.
    // On the other hand, if the transcoder actually finished (status is AMEDIA_OK) at around
    // the same time of the abort, we should still report the finish back to the client.
    if (mCancelled && status != AMEDIA_OK) {
        return;
void MediaTranscoder::onThreadFinished(const void* thread, media_status_t threadStatus,
                                       bool threadStopped) {
    LOG(DEBUG) << "Thread " << thread << " finished with status " << threadStatus << " stopped "
               << threadStopped;

    // Stop all threads if one reports an error.
    if (threadStatus != AMEDIA_OK) {
        requestStop(false /* stopOnSync */);
    }

    bool expected = false;
    if (mCallbackSent.compare_exchange_strong(expected, true)) {
        if (status == AMEDIA_OK) {
            mCallbacks->onFinished(this);
    std::scoped_lock lock{mThreadStateMutex};

    // Record the change.
    mThreadStates[thread] = DONE;
    if (threadStatus != AMEDIA_OK && mTranscoderStatus == AMEDIA_OK) {
        mTranscoderStatus = threadStatus;
    }

    mTranscoderStopped |= threadStopped;

    // Check if all threads are done. Note that if all transcoders have stopped but the sample
    // writer has not yet started, it never will.
    bool transcodersDone = true;
    ThreadState sampleWriterState = PENDING;
    for (const auto& it : mThreadStates) {
        LOG(DEBUG) << "  Thread " << it.first << " state" << it.second;
        if (it.first == static_cast<const void*>(mSampleWriter.get())) {
            sampleWriterState = it.second;
        } else {
            mCallbacks->onError(this, status);
            transcodersDone &= (it.second == DONE);
        }
    }
    if (!transcodersDone || sampleWriterState == RUNNING) {
        return;
    }

        // Transcoding is done and the callback to the client has been sent, so tear down the
        // pipeline but do it asynchronously to avoid deadlocks. If an error occurred, client
        // should clean up the file.
        std::thread asyncCancelThread{[self = shared_from_this()] { self->cancel(); }};
        asyncCancelThread.detach();
    // All done. Send callback asynchronously and wake up threads waiting in cancel/pause.
    mThreadsDone = true;
    if (!mCallbackSent) {
        std::thread asyncNotificationThread{[this, self = shared_from_this(),
                                             status = mTranscoderStatus,
                                             stopped = mTranscoderStopped] {
            // If the transcoder was stopped that means a caller is waiting in stop or pause
            // in which case we don't send a callback.
            if (status != AMEDIA_OK) {
                mCallbacks->onError(this, status);
            } else if (!stopped) {
                mCallbacks->onFinished(this);
            }
            mThreadsDoneSignal.notify_all();
        }};
        asyncNotificationThread.detach();
        mCallbackSent = true;
    }
}

void MediaTranscoder::onTrackFormatAvailable(const MediaTrackTranscoder* transcoder) {
    LOG(INFO) << "TrackTranscoder " << transcoder << " format available.";
    LOG(DEBUG) << "TrackTranscoder " << transcoder << " format available.";

    std::scoped_lock lock{mTracksAddedMutex};
    const void* sampleWriterPtr = static_cast<const void*>(mSampleWriter.get());

    // Ignore duplicate format change.
    if (mTracksAdded.count(transcoder) > 0) {
@@ -111,7 +140,7 @@ void MediaTranscoder::onTrackFormatAvailable(const MediaTrackTranscoder* transco
    auto consumer = mSampleWriter->addTrack(transcoder->getOutputFormat());
    if (consumer == nullptr) {
        LOG(ERROR) << "Unable to add track to sample writer.";
        sendCallback(AMEDIA_ERROR_UNKNOWN);
        onThreadFinished(sampleWriterPtr, AMEDIA_ERROR_UNKNOWN, false /* stopped */);
        return;
    }

@@ -119,34 +148,57 @@ void MediaTranscoder::onTrackFormatAvailable(const MediaTrackTranscoder* transco
    mutableTranscoder->setSampleConsumer(consumer);

    mTracksAdded.insert(transcoder);
    bool errorStarting = false;
    if (mTracksAdded.size() == mTrackTranscoders.size()) {
        // Enable sequential access mode on the sample reader to achieve optimal read performance.
        // This has to wait until all tracks have delivered their output formats and the sample
        // writer is started. Otherwise the tracks will not get their output sample queues drained
        // and the transcoder could hang due to one track running out of buffers and blocking the
        // other tracks from reading source samples before they could output their formats.

        std::scoped_lock lock{mThreadStateMutex};
        // Don't start the sample writer if a stop already has been requested.
        if (!mSampleWriterStopped) {
            if (!mCancelled) {
                mSampleReader->setEnforceSequentialAccess(true);
        LOG(INFO) << "Starting sample writer.";
        bool started = mSampleWriter->start();
        if (!started) {
            LOG(ERROR) << "Unable to start sample writer.";
            sendCallback(AMEDIA_ERROR_UNKNOWN);
            }
            LOG(DEBUG) << "Starting sample writer.";
            errorStarting = !mSampleWriter->start();
            if (!errorStarting) {
                mThreadStates[sampleWriterPtr] = RUNNING;
            }
        }
    }

    if (errorStarting) {
        LOG(ERROR) << "Unable to start sample writer.";
        onThreadFinished(sampleWriterPtr, AMEDIA_ERROR_UNKNOWN, false /* stopped */);
    }
}

void MediaTranscoder::onTrackFinished(const MediaTrackTranscoder* transcoder) {
    LOG(DEBUG) << "TrackTranscoder " << transcoder << " finished";
    onThreadFinished(static_cast<const void*>(transcoder), AMEDIA_OK, false /* stopped */);
}

void MediaTranscoder::onTrackStopped(const MediaTrackTranscoder* transcoder) {
    LOG(DEBUG) << "TrackTranscoder " << transcoder << " stopped";
    onThreadFinished(static_cast<const void*>(transcoder), AMEDIA_OK, true /* stopped */);
}

void MediaTranscoder::onTrackError(const MediaTrackTranscoder* transcoder, media_status_t status) {
    LOG(ERROR) << "TrackTranscoder " << transcoder << " returned error " << status;
    sendCallback(status);
    onThreadFinished(static_cast<const void*>(transcoder), status, false /* stopped */);
}

void MediaTranscoder::onFinished(const MediaSampleWriter* writer, media_status_t status) {
    LOG(status == AMEDIA_OK ? DEBUG : ERROR) << "Sample writer finished with status " << status;
    onThreadFinished(static_cast<const void*>(writer), status, false /* stopped */);
}

void MediaTranscoder::onFinished(const MediaSampleWriter* writer __unused, media_status_t status) {
    LOG((status != AMEDIA_OK) ? ERROR : DEBUG) << "Sample writer finished with status " << status;
    sendCallback(status);
void MediaTranscoder::onStopped(const MediaSampleWriter* writer) {
    LOG(DEBUG) << "Sample writer " << writer << " stopped";
    onThreadFinished(static_cast<const void*>(writer), AMEDIA_OK, true /* stopped */);
}

void MediaTranscoder::onProgressUpdate(const MediaSampleWriter* writer __unused, int32_t progress) {
@@ -276,6 +328,9 @@ media_status_t MediaTranscoder::configureTrackFormat(size_t trackIndex, AMediaFo
        return status;
    }

    std::scoped_lock lock{mThreadStateMutex};
    mThreadStates[static_cast<const void*>(transcoder.get())] = PENDING;

    mTrackTranscoders.emplace_back(std::move(transcoder));
    return AMEDIA_OK;
}
@@ -300,6 +355,8 @@ media_status_t MediaTranscoder::configureDestination(int fd) {
        return AMEDIA_ERROR_UNKNOWN;
    }

    std::scoped_lock lock{mThreadStateMutex};
    mThreadStates[static_cast<const void*>(mSampleWriter.get())] = PENDING;
    return AMEDIA_OK;
}

@@ -313,42 +370,80 @@ media_status_t MediaTranscoder::start() {
    }

    // Start transcoders
    bool started = true;
    {
        std::scoped_lock lock{mThreadStateMutex};
        for (auto& transcoder : mTrackTranscoders) {
        bool started = transcoder->start();
            if (!(started = transcoder->start())) {
                break;
            }
            mThreadStates[static_cast<const void*>(transcoder.get())] = RUNNING;
        }
    }
    if (!started) {
        LOG(ERROR) << "Unable to start track transcoder.";
        cancel();
        return AMEDIA_ERROR_UNKNOWN;
    }
    return AMEDIA_OK;
}

media_status_t MediaTranscoder::requestStop(bool stopOnSync) {
    std::scoped_lock lock{mThreadStateMutex};
    if (mCancelled) {
        LOG(DEBUG) << "MediaTranscoder already cancelled";
        return AMEDIA_ERROR_UNSUPPORTED;
    }

    if (!stopOnSync) {
        mSampleWriterStopped = true;
        mSampleWriter->stop();
    }

    mSampleReader->setEnforceSequentialAccess(false);
    for (auto& transcoder : mTrackTranscoders) {
        transcoder->stop(stopOnSync);
    }

    mCancelled = true;
    return AMEDIA_OK;
}

media_status_t MediaTranscoder::pause(std::shared_ptr<ndk::ScopedAParcel>* pausedState) {
    // TODO: write internal states to parcel.
    *pausedState = std::shared_ptr<::ndk::ScopedAParcel>(new ::ndk::ScopedAParcel());
    return cancel();
void MediaTranscoder::waitForThreads() NO_THREAD_SAFETY_ANALYSIS {
    std::unique_lock lock{mThreadStateMutex};
    while (!mThreadsDone) {
        mThreadsDoneSignal.wait(lock);
    }
}

media_status_t MediaTranscoder::resume() {
    // TODO: restore internal states from parcel.
    return start();
media_status_t MediaTranscoder::pause(std::shared_ptr<ndk::ScopedAParcel>* pausedState) {
    media_status_t status = requestStop(true /* stopOnSync */);
    if (status != AMEDIA_OK) {
        return status;
    }

media_status_t MediaTranscoder::cancel() {
    bool expected = false;
    if (!mCancelled.compare_exchange_strong(expected, true)) {
        // Already cancelled.
    waitForThreads();

    // TODO: write internal states to parcel.
    *pausedState = std::shared_ptr<::ndk::ScopedAParcel>(new ::ndk::ScopedAParcel());
    return AMEDIA_OK;
}

    mSampleWriter->stop();
    mSampleReader->setEnforceSequentialAccess(false);
    for (auto& transcoder : mTrackTranscoders) {
        transcoder->stop();
media_status_t MediaTranscoder::cancel() {
    media_status_t status = requestStop(false /* stopOnSync */);
    if (status != AMEDIA_OK) {
        return status;
    }

    waitForThreads();

    // TODO: Release transcoders?
    return AMEDIA_OK;
}

media_status_t MediaTranscoder::resume() {
    // TODO: restore internal states from parcel.
    return start();
}

}  // namespace android
+15 −9
Original line number Diff line number Diff line
@@ -93,9 +93,10 @@ media_status_t PassthroughTrackTranscoder::configureDestinationFormat(
    return AMEDIA_OK;
}

media_status_t PassthroughTrackTranscoder::runTranscodeLoop() {
media_status_t PassthroughTrackTranscoder::runTranscodeLoop(bool* stopped) {
    MediaSampleInfo info;
    std::shared_ptr<MediaSample> sample;
    bool eosReached = false;

    // Notify the track format as soon as we start. It's same as the source format.
    notifyTrackFormatAvailable();
@@ -106,18 +107,18 @@ media_status_t PassthroughTrackTranscoder::runTranscodeLoop() {
            };

    // Move samples until EOS is reached or transcoding is stopped.
    while (!mStopRequested && !mEosFromSource) {
    while (mStopRequest != STOP_NOW && !eosReached) {
        media_status_t status = mMediaSampleReader->getSampleInfoForTrack(mTrackIndex, &info);

        if (status == AMEDIA_OK) {
            uint8_t* buffer = mBufferPool->getBufferWithSize(info.size);
            if (buffer == nullptr) {
                if (mStopRequested) {
                if (mStopRequest == STOP_NOW) {
                    break;
                }

                LOG(ERROR) << "Unable to get buffer from pool";
                return AMEDIA_ERROR_IO;  // TODO: Custom error codes?
                return AMEDIA_ERROR_UNKNOWN;
            }

            sample = MediaSample::createWithReleaseCallback(
@@ -131,7 +132,7 @@ media_status_t PassthroughTrackTranscoder::runTranscodeLoop() {

        } else if (status == AMEDIA_ERROR_END_OF_STREAM) {
            sample = std::make_shared<MediaSample>();
            mEosFromSource = true;
            eosReached = true;
        } else {
            LOG(ERROR) << "Unable to get next sample info. Aborting transcode.";
            return status;
@@ -139,18 +140,23 @@ media_status_t PassthroughTrackTranscoder::runTranscodeLoop() {

        sample->info = info;
        onOutputSampleAvailable(sample);

        if (mStopRequest == STOP_ON_SYNC && info.flags & SAMPLE_FLAG_SYNC_SAMPLE) {
            break;
        }
    }

    if (mStopRequested && !mEosFromSource) {
        return AMEDIA_ERROR_UNKNOWN;  // TODO: Custom error codes?
    if (mStopRequest != NONE && !eosReached) {
        *stopped = true;
    }
    return AMEDIA_OK;
}

void PassthroughTrackTranscoder::abortTranscodeLoop() {
    mStopRequested = true;
    if (mStopRequest == STOP_NOW) {
        mBufferPool->abort();
    }
}

std::shared_ptr<AMediaFormat> PassthroughTrackTranscoder::getOutputFormat() const {
    return mSourceFormat;
Loading