Donate to e Foundation | Murena handsets with /e/OS | Own a part of Murena! Learn more

Commit b55c5456 authored by Chong Zhang's avatar Chong Zhang
Browse files

transcoder: initial version of pause/resume

- Add pause/resume in TranscoderWrapper, save paused state on pause
  and use it to create new transcoder on resume.

Misc fixes:

- TranscoderWrapper::stop should only cancel transcoder if the stop
  is for the currently running job. Scheduler could call stop to
  cancel a job any time.
- Don't hold TranscoderWrapper lock when running event runnable. If
  the runnable calls back into scheduler, and scheduler may call
  transcoder again and deadlock.
- Don't report abort as error if the transcoder is cancelled explicitly.
- Push decoder/encoder start as msgs, so that they could be skipped too
  if the job is cancelled shortly after starts.

Tests:
Add tests for cancel/pause/resume with real transcoder.

bug: 154734285
bug: 154733948
test: unit testing
Change-Id: I2b7d3da69df53b92ab351db455310799ba0e0e8f
parent 1ad03953
Loading
Loading
Loading
Loading
+170 −60
Original line number Diff line number Diff line
@@ -56,6 +56,38 @@ static TranscodingErrorCode toTranscodingError(media_status_t status) {
    }
}

static AMediaFormat* getVideoFormat(
        const char* originalMime,
        const std::optional<TranscodingVideoTrackFormat>& requestedFormat) {
    if (requestedFormat == std::nullopt) {
        return nullptr;
    }

    AMediaFormat* format = AMediaFormat_new();
    bool changed = false;
    if (requestedFormat->codecType == TranscodingVideoCodecType::kHevc &&
        strcmp(originalMime, AMEDIA_MIMETYPE_VIDEO_HEVC)) {
        AMediaFormat_setString(format, AMEDIAFORMAT_KEY_MIME, AMEDIA_MIMETYPE_VIDEO_HEVC);
        changed = true;
    } else if (requestedFormat->codecType == TranscodingVideoCodecType::kAvc &&
               strcmp(originalMime, AMEDIA_MIMETYPE_VIDEO_AVC)) {
        AMediaFormat_setString(format, AMEDIAFORMAT_KEY_MIME, AMEDIA_MIMETYPE_VIDEO_AVC);
        changed = true;
    }
    if (requestedFormat->bitrateBps > 0) {
        AMediaFormat_setInt32(format, AMEDIAFORMAT_KEY_BIT_RATE, requestedFormat->bitrateBps);
        changed = true;
    }
    // TODO: translate other fields from requestedFormat to the format for MediaTranscoder.
    // Also need to determine more settings to expose in TranscodingVideoTrackFormat.
    if (!changed) {
        AMediaFormat_delete(format);
        // Use null format for passthru.
        format = nullptr;
    }
    return format;
}

//static
const char* TranscoderWrapper::toString(Event::Type type) {
    switch (type) {
@@ -105,7 +137,7 @@ public:
    }

    virtual void onCodecResourceLost(const MediaTranscoder* transcoder __unused,
                                     const std::shared_ptr<const Parcelable>& pausedState
                                     const std::shared_ptr<const Parcel>& pausedState
                                             __unused) override {
        ALOGV("%s: job {%lld, %d}", __FUNCTION__, (long long)mClientId, mJobId);
    }
@@ -126,51 +158,87 @@ void TranscoderWrapper::setCallback(const std::shared_ptr<TranscoderCallbackInte

void TranscoderWrapper::start(ClientIdType clientId, JobIdType jobId,
                              const TranscodingRequestParcel& request,
                              const std::shared_ptr<ITranscodingClientCallback>& callback) {
                              const std::shared_ptr<ITranscodingClientCallback>& clientCb) {
    queueEvent(Event::Start, clientId, jobId, [=] {
        TranscodingErrorCode err = handleStart(clientId, jobId, request, callback);
        TranscodingErrorCode err = handleStart(clientId, jobId, request, clientCb);

        auto callback = mCallback.lock();
        if (err != TranscodingErrorCode::kNoError) {
            cleanup();

            auto callback = mCallback.lock();
            if (callback != nullptr) {
                callback->onError(clientId, jobId, err);
            }
        } else {
            if (callback != nullptr) {
                callback->onStarted(clientId, jobId);
            }
        }
    });
}

void TranscoderWrapper::pause(ClientIdType clientId, JobIdType jobId) {
    queueEvent(Event::Pause, clientId, jobId, [] {});
    queueEvent(Event::Pause, clientId, jobId, [=] {
        TranscodingErrorCode err = handlePause(clientId, jobId);

        cleanup();

        auto callback = mCallback.lock();
        if (callback != nullptr) {
            if (err != TranscodingErrorCode::kNoError) {
                callback->onError(clientId, jobId, err);
            } else {
                callback->onPaused(clientId, jobId);
            }
        }
    });
}

void TranscoderWrapper::resume(ClientIdType clientId, JobIdType jobId) {
    queueEvent(Event::Resume, clientId, jobId, [] {});
void TranscoderWrapper::resume(ClientIdType clientId, JobIdType jobId,
                               const TranscodingRequestParcel& request,
                               const std::shared_ptr<ITranscodingClientCallback>& clientCb) {
    queueEvent(Event::Resume, clientId, jobId, [=] {
        TranscodingErrorCode err = handleResume(clientId, jobId, request, clientCb);

        auto callback = mCallback.lock();
        if (err != TranscodingErrorCode::kNoError) {
            cleanup();

            if (callback != nullptr) {
                callback->onError(clientId, jobId, err);
            }
        } else {
            if (callback != nullptr) {
                callback->onResumed(clientId, jobId);
            }
        }
    });
}

void TranscoderWrapper::stop(ClientIdType clientId, JobIdType jobId) {
    queueEvent(Event::Stop, clientId, jobId, [=] {
        if (clientId != mCurrentClientId || jobId != mCurrentJobId) {
            ALOGW("Stopping job {%lld, %d} that's not current job {%lld, %d}", (long long)clientId,
                  jobId, (long long)mCurrentClientId, mCurrentJobId);
        }

        // stop transcoder.
        if (mTranscoder != nullptr && clientId == mCurrentClientId && jobId == mCurrentJobId) {
            // Cancelling the currently running job.
            media_status_t err = mTranscoder->cancel();
            if (err != AMEDIA_OK) {
                ALOGE("failed to stop transcoder: %d", err);
            } else {
                ALOGI("transcoder stopped");
            }

            cleanup();
        } else {
            // For jobs that's not currently running, release any pausedState for the job.
            mPausedStateMap.erase(JobKeyType(clientId, jobId));
        }
        // No callback needed for stop.
    });
}

void TranscoderWrapper::onFinish(ClientIdType clientId, JobIdType jobId) {
    queueEvent(Event::Finish, clientId, jobId, [=] {
        if (mTranscoder != nullptr && clientId == mCurrentClientId && jobId == mCurrentJobId) {
            cleanup();
        }

        auto callback = mCallback.lock();
        if (callback != nullptr) {
@@ -182,7 +250,9 @@ void TranscoderWrapper::onFinish(ClientIdType clientId, JobIdType jobId) {
void TranscoderWrapper::onError(ClientIdType clientId, JobIdType jobId,
                                TranscodingErrorCode error) {
    queueEvent(Event::Error, clientId, jobId, [=] {
        if (mTranscoder != nullptr && clientId == mCurrentClientId && jobId == mCurrentJobId) {
            cleanup();
        }

        auto callback = mCallback.lock();
        if (callback != nullptr) {
@@ -191,41 +261,10 @@ void TranscoderWrapper::onError(ClientIdType clientId, JobIdType jobId,
    });
}

static AMediaFormat* getVideoFormat(
        const char* originalMime,
        const std::optional<TranscodingVideoTrackFormat>& requestedFormat) {
    if (requestedFormat == std::nullopt) {
        return nullptr;
    }

    AMediaFormat* format = AMediaFormat_new();
    bool changed = false;
    if (requestedFormat->codecType == TranscodingVideoCodecType::kHevc &&
        strcmp(originalMime, AMEDIA_MIMETYPE_VIDEO_HEVC)) {
        AMediaFormat_setString(format, AMEDIAFORMAT_KEY_MIME, AMEDIA_MIMETYPE_VIDEO_HEVC);
        changed = true;
    } else if (requestedFormat->codecType == TranscodingVideoCodecType::kAvc &&
               strcmp(originalMime, AMEDIA_MIMETYPE_VIDEO_AVC)) {
        AMediaFormat_setString(format, AMEDIAFORMAT_KEY_MIME, AMEDIA_MIMETYPE_VIDEO_AVC);
        changed = true;
    }
    if (requestedFormat->bitrateBps > 0) {
        AMediaFormat_setInt32(format, AMEDIAFORMAT_KEY_BIT_RATE, requestedFormat->bitrateBps);
        changed = true;
    }
    // TODO: translate other fields from requestedFormat to the format for MediaTranscoder.
    // Also need to determine more settings to expose in TranscodingVideoTrackFormat.
    if (!changed) {
        AMediaFormat_delete(format);
        // Use null format for passthru.
        format = nullptr;
    }
    return format;
}

TranscodingErrorCode TranscoderWrapper::handleStart(
TranscodingErrorCode TranscoderWrapper::setupTranscoder(
        ClientIdType clientId, JobIdType jobId, const TranscodingRequestParcel& request,
        const std::shared_ptr<ITranscodingClientCallback>& clientCb) {
        const std::shared_ptr<ITranscodingClientCallback>& clientCb,
        const std::shared_ptr<const Parcel>& pausedState) {
    if (clientCb == nullptr) {
        ALOGE("client callback is null");
        return TranscodingErrorCode::kInvalidParameter;
@@ -244,7 +283,10 @@ TranscodingErrorCode TranscoderWrapper::handleStart(
        return TranscodingErrorCode::kErrorIO;
    }

    status = clientCb->openFileDescriptor(request.destinationFilePath, "w", &dstFd);
    // Open dest file with "rw", as the transcoder could potentially reuse part of it
    // for resume case. We might want the further differentiate and open with "w" only
    // for start.
    status = clientCb->openFileDescriptor(request.destinationFilePath, "rw", &dstFd);
    if (!status.isOk() || dstFd.get() < 0) {
        ALOGE("failed to open destination");
        return TranscodingErrorCode::kErrorIO;
@@ -253,7 +295,7 @@ TranscodingErrorCode TranscoderWrapper::handleStart(
    mCurrentClientId = clientId;
    mCurrentJobId = jobId;
    mTranscoderCb = std::make_shared<CallbackImpl>(shared_from_this(), clientId, jobId);
    mTranscoder = MediaTranscoder::create(mTranscoderCb, nullptr);
    mTranscoder = MediaTranscoder::create(mTranscoderCb, pausedState);
    if (mTranscoder == nullptr) {
        ALOGE("failed to create transcoder");
        return TranscodingErrorCode::kUnknown;
@@ -296,13 +338,79 @@ TranscodingErrorCode TranscoderWrapper::handleStart(
        return toTranscodingError(err);
    }

    err = mTranscoder->start();
    return TranscodingErrorCode::kNoError;
}

TranscodingErrorCode TranscoderWrapper::handleStart(
        ClientIdType clientId, JobIdType jobId, const TranscodingRequestParcel& request,
        const std::shared_ptr<ITranscodingClientCallback>& clientCb) {
    ALOGI("setting up transcoder for start");
    TranscodingErrorCode err = setupTranscoder(clientId, jobId, request, clientCb);
    if (err != TranscodingErrorCode::kNoError) {
        ALOGI("%s: failed to setup transcoder", __FUNCTION__);
        return err;
    }

    media_status_t status = mTranscoder->start();
    if (status != AMEDIA_OK) {
        ALOGE("%s: failed to start transcoder: %d", __FUNCTION__, err);
        return toTranscodingError(status);
    }

    ALOGI("%s: transcoder started", __FUNCTION__);
    return TranscodingErrorCode::kNoError;
}

TranscodingErrorCode TranscoderWrapper::handlePause(ClientIdType clientId, JobIdType jobId) {
    if (mTranscoder == nullptr) {
        ALOGE("%s: transcoder is not running", __FUNCTION__);
        return TranscodingErrorCode::kInvalidOperation;
    }

    if (clientId != mCurrentClientId || jobId != mCurrentJobId) {
        ALOGW("%s: stopping job {%lld, %d} that's not current job {%lld, %d}", __FUNCTION__,
              (long long)clientId, jobId, (long long)mCurrentClientId, mCurrentJobId);
    }

    std::shared_ptr<const Parcel> pauseStates;
    media_status_t err = mTranscoder->pause(&pauseStates);
    if (err != AMEDIA_OK) {
        ALOGE("failed to start transcoder: %d", err);
        ALOGE("%s: failed to pause transcoder: %d", __FUNCTION__, err);
        return toTranscodingError(err);
    }
    mPausedStateMap[JobKeyType(clientId, jobId)] = pauseStates;

    ALOGI("%s: transcoder paused", __FUNCTION__);
    return TranscodingErrorCode::kNoError;
}

TranscodingErrorCode TranscoderWrapper::handleResume(
        ClientIdType clientId, JobIdType jobId, const TranscodingRequestParcel& request,
        const std::shared_ptr<ITranscodingClientCallback>& clientCb) {
    std::shared_ptr<const Parcel> pausedState;
    auto it = mPausedStateMap.find(JobKeyType(clientId, jobId));
    if (it != mPausedStateMap.end()) {
        pausedState = it->second;
        mPausedStateMap.erase(it);
    } else {
        ALOGE("%s: can't find paused state", __FUNCTION__);
        return TranscodingErrorCode::kInvalidOperation;
    }

    ALOGI("setting up transcoder for resume");
    TranscodingErrorCode err = setupTranscoder(clientId, jobId, request, clientCb, pausedState);
    if (err != TranscodingErrorCode::kNoError) {
        ALOGE("%s: failed to setup transcoder", __FUNCTION__);
        return err;
    }

    media_status_t status = mTranscoder->resume();
    if (status != AMEDIA_OK) {
        ALOGE("%s: failed to resume transcoder: %d", __FUNCTION__, err);
        return toTranscodingError(status);
    }

    ALOGI("transcoder started");
    ALOGI("%s: transcoder resumed", __FUNCTION__);
    return TranscodingErrorCode::kNoError;
}

@@ -339,7 +447,9 @@ void TranscoderWrapper::threadLoop() {
        ALOGD("%s: job {%lld, %d}: %s", __FUNCTION__, (long long)event.clientId, event.jobId,
              toString(event.type));

        lock.unlock();
        event.runnable();
        lock.lock();
    }
}

+2 −1
Original line number Diff line number Diff line
@@ -79,7 +79,8 @@ void TranscodingJobScheduler::updateCurrentJob_l() {
                mTranscoder->start(topJob->key.first, topJob->key.second, topJob->request,
                                   topJob->callback.lock());
            } else if (topJob->state == Job::PAUSED) {
                mTranscoder->resume(topJob->key.first, topJob->key.second);
                mTranscoder->resume(topJob->key.first, topJob->key.second, topJob->request,
                                    topJob->callback.lock());
            }
            topJob->state = Job::RUNNING;
        }
+3 −3
Original line number Diff line number Diff line
@@ -32,14 +32,14 @@ class TranscoderCallbackInterface;
// Interface for the scheduler to call the transcoder to take actions.
class TranscoderInterface {
public:
    // TODO(chz): determine what parameters are needed here.
    // For now, always pass in clientId&jobId.
    virtual void setCallback(const std::shared_ptr<TranscoderCallbackInterface>& cb) = 0;
    virtual void start(ClientIdType clientId, JobIdType jobId,
                       const TranscodingRequestParcel& request,
                       const std::shared_ptr<ITranscodingClientCallback>& clientCallback) = 0;
    virtual void pause(ClientIdType clientId, JobIdType jobId) = 0;
    virtual void resume(ClientIdType clientId, JobIdType jobId) = 0;
    virtual void resume(ClientIdType clientId, JobIdType jobId,
                        const TranscodingRequestParcel& request,
                        const std::shared_ptr<ITranscodingClientCallback>& clientCallback) = 0;
    virtual void stop(ClientIdType clientId, JobIdType jobId) = 0;

protected:
+16 −1
Original line number Diff line number Diff line
@@ -21,11 +21,13 @@
#include <media/TranscoderInterface.h>

#include <list>
#include <map>
#include <mutex>

namespace android {

class MediaTranscoder;
class Parcelable;

/*
 * Wrapper class around MediaTranscoder.
@@ -41,7 +43,9 @@ public:
                       const TranscodingRequestParcel& request,
                       const std::shared_ptr<ITranscodingClientCallback>& clientCallback) override;
    virtual void pause(ClientIdType clientId, JobIdType jobId) override;
    virtual void resume(ClientIdType clientId, JobIdType jobId) override;
    virtual void resume(ClientIdType clientId, JobIdType jobId,
                        const TranscodingRequestParcel& request,
                        const std::shared_ptr<ITranscodingClientCallback>& clientCallback) override;
    virtual void stop(ClientIdType clientId, JobIdType jobId) override;

private:
@@ -52,12 +56,15 @@ private:
        JobIdType jobId;
        std::function<void()> runnable;
    };
    using JobKeyType = std::pair<ClientIdType, JobIdType>;

    std::shared_ptr<CallbackImpl> mTranscoderCb;
    std::shared_ptr<MediaTranscoder> mTranscoder;
    std::weak_ptr<TranscoderCallbackInterface> mCallback;
    std::mutex mLock;
    std::condition_variable mCondition;
    std::list<Event> mQueue;  // GUARDED_BY(mLock);
    std::map<JobKeyType, std::shared_ptr<const Parcel>> mPausedStateMap;
    ClientIdType mCurrentClientId;
    JobIdType mCurrentJobId;

@@ -68,6 +75,14 @@ private:
    TranscodingErrorCode handleStart(ClientIdType clientId, JobIdType jobId,
                                     const TranscodingRequestParcel& request,
                                     const std::shared_ptr<ITranscodingClientCallback>& callback);
    TranscodingErrorCode handlePause(ClientIdType clientId, JobIdType jobId);
    TranscodingErrorCode handleResume(ClientIdType clientId, JobIdType jobId,
                                      const TranscodingRequestParcel& request,
                                      const std::shared_ptr<ITranscodingClientCallback>& callback);
    TranscodingErrorCode setupTranscoder(
            ClientIdType clientId, JobIdType jobId, const TranscodingRequestParcel& request,
            const std::shared_ptr<ITranscodingClientCallback>& callback,
            const std::shared_ptr<const Parcel>& pausedState = nullptr);

    void cleanup();
    void queueEvent(Event::Type type, ClientIdType clientId, JobIdType jobId,
+2 −1
Original line number Diff line number Diff line
@@ -94,7 +94,8 @@ public:
    void pause(ClientIdType clientId, JobIdType jobId) override {
        mEventQueue.push_back(Pause(clientId, jobId));
    }
    void resume(ClientIdType clientId, JobIdType jobId) override {
    void resume(ClientIdType clientId, JobIdType jobId, const TranscodingRequestParcel& /*request*/,
                const std::shared_ptr<ITranscodingClientCallback>& /*clientCallback*/) override {
        mEventQueue.push_back(Resume(clientId, jobId));
    }
    void stop(ClientIdType clientId, JobIdType jobId) override {
Loading