Donate to e Foundation | Murena handsets with /e/OS | Own a part of Murena! Learn more

Commit de60f064 authored by Chong Zhang's avatar Chong Zhang
Browse files

transcoding: use actual callback in SimulatedTranscoder

bug: 154734285
test: unit testing

Change-Id: I39f17e95e9cc9ea73a60828b00a3c79ccf11dbf2
parent 9da2cbf1
Loading
Loading
Loading
Loading
+67 −67
Original line number Diff line number Diff line
@@ -291,15 +291,16 @@ bool TranscodingJobScheduler::getJob(ClientIdType clientId, JobIdType jobId,
    return true;
}

void TranscodingJobScheduler::onFinish(ClientIdType clientId, JobIdType jobId) {
void TranscodingJobScheduler::notifyClient(ClientIdType clientId, JobIdType jobId,
                                           const char* reason,
                                           std::function<void(const JobKeyType&)> func) {
    JobKeyType jobKey = std::make_pair(clientId, jobId);

    ALOGV("%s: job %s", __FUNCTION__, jobToString(jobKey).c_str());

    std::scoped_lock lock{mLock};

    if (mJobMap.count(jobKey) == 0) {
        ALOGW("ignoring finish for non-existent job");
        ALOGW("%s: ignoring %s for job %s that doesn't exist", __FUNCTION__, reason,
              jobToString(jobKey).c_str());
        return;
    }

@@ -307,14 +308,48 @@ void TranscodingJobScheduler::onFinish(ClientIdType clientId, JobIdType jobId) {
    // to client if the job is paused. Transcoder could have posted finish when
    // we're pausing it, and the finish arrived after we changed current job.
    if (mJobMap[jobKey].state == Job::NOT_STARTED) {
        ALOGW("ignoring finish for job that was never started");
        ALOGW("%s: ignoring %s for job %s that was never started", __FUNCTION__, reason,
              jobToString(jobKey).c_str());
        return;
    }

    ALOGV("%s: job %s %s", __FUNCTION__, jobToString(jobKey).c_str(), reason);
    func(jobKey);
}

void TranscodingJobScheduler::onStarted(ClientIdType clientId, JobIdType jobId) {
    notifyClient(clientId, jobId, "started", [=](const JobKeyType& jobKey) {
        auto callback = mJobMap[jobKey].callback.lock();
        if (callback != nullptr) {
            callback->onTranscodingStarted(jobId);
        }
    });
}

void TranscodingJobScheduler::onPaused(ClientIdType clientId, JobIdType jobId) {
    notifyClient(clientId, jobId, "paused", [=](const JobKeyType& jobKey) {
        auto callback = mJobMap[jobKey].callback.lock();
        if (callback != nullptr) {
            callback->onTranscodingPaused(jobId);
        }
    });
}

void TranscodingJobScheduler::onResumed(ClientIdType clientId, JobIdType jobId) {
    notifyClient(clientId, jobId, "resumed", [=](const JobKeyType& jobKey) {
        auto callback = mJobMap[jobKey].callback.lock();
        if (callback != nullptr) {
            callback->onTranscodingResumed(jobId);
        }
    });
}

void TranscodingJobScheduler::onFinish(ClientIdType clientId, JobIdType jobId) {
    notifyClient(clientId, jobId, "finish", [=](const JobKeyType& jobKey) {
        {
            auto clientCallback = mJobMap[jobKey].callback.lock();
            if (clientCallback != nullptr) {
            clientCallback->onTranscodingFinished(jobId, TranscodingResultParcel({jobId, 0}));
                clientCallback->onTranscodingFinished(jobId, TranscodingResultParcel({jobId, -1 /*actualBitrateBps*/}));
            }
        }

@@ -325,29 +360,12 @@ void TranscodingJobScheduler::onFinish(ClientIdType clientId, JobIdType jobId) {
        updateCurrentJob_l();

        validateState_l();
    });
}

void TranscodingJobScheduler::onError(ClientIdType clientId, JobIdType jobId,
                                      TranscodingErrorCode err) {
    JobKeyType jobKey = std::make_pair(clientId, jobId);

    ALOGV("%s: job %s, err %d", __FUNCTION__, jobToString(jobKey).c_str(), (int32_t)err);

    std::scoped_lock lock{mLock};

    if (mJobMap.count(jobKey) == 0) {
        ALOGW("ignoring error for non-existent job");
        return;
    }

    // Only ignore if job was never started. In particular, propagate the status
    // to client if the job is paused. Transcoder could have posted finish when
    // we're pausing it, and the finish arrived after we changed current job.
    if (mJobMap[jobKey].state == Job::NOT_STARTED) {
        ALOGW("ignoring error for job that was never started");
        return;
    }

    notifyClient(clientId, jobId, "error", [=](const JobKeyType& jobKey) {
        {
            auto clientCallback = mJobMap[jobKey].callback.lock();
            if (clientCallback != nullptr) {
@@ -362,35 +380,17 @@ void TranscodingJobScheduler::onError(ClientIdType clientId, JobIdType jobId,
        updateCurrentJob_l();

        validateState_l();
    });
}

void TranscodingJobScheduler::onProgressUpdate(ClientIdType clientId, JobIdType jobId,
                                               int32_t progress) {
    JobKeyType jobKey = std::make_pair(clientId, jobId);

    ALOGV("%s: job %s, progress %d", __FUNCTION__, jobToString(jobKey).c_str(), progress);

    std::scoped_lock lock{mLock};

    if (mJobMap.count(jobKey) == 0) {
        ALOGW("ignoring progress for non-existent job");
        return;
    }

    // Only ignore if job was never started. In particular, propagate the status
    // to client if the job is paused. Transcoder could have posted finish when
    // we're pausing it, and the finish arrived after we changed current job.
    if (mJobMap[jobKey].state == Job::NOT_STARTED) {
        ALOGW("ignoring progress for job that was never started");
        return;
    }

    {
        auto clientCallback = mJobMap[jobKey].callback.lock();
        if (clientCallback != nullptr) {
            clientCallback->onProgressUpdate(jobId, progress);
        }
    notifyClient(clientId, jobId, "progress", [=](const JobKeyType& jobKey) {
        auto callback = mJobMap[jobKey].callback.lock();
        if (callback != nullptr) {
            callback->onProgressUpdate(jobId, progress);
        }
    });
}

void TranscodingJobScheduler::onResourceLost() {
+3 −0
Original line number Diff line number Diff line
@@ -48,6 +48,9 @@ protected:
class TranscoderCallbackInterface {
public:
    // TODO(chz): determine what parameters are needed here.
    virtual void onStarted(ClientIdType clientId, JobIdType jobId) = 0;
    virtual void onPaused(ClientIdType clientId, JobIdType jobId) = 0;
    virtual void onResumed(ClientIdType clientId, JobIdType jobId) = 0;
    virtual void onFinish(ClientIdType clientId, JobIdType jobId) = 0;
    virtual void onError(ClientIdType clientId, JobIdType jobId, TranscodingErrorCode err) = 0;
    virtual void onProgressUpdate(ClientIdType clientId, JobIdType jobId, int32_t progress) = 0;
+5 −1
Original line number Diff line number Diff line
@@ -47,6 +47,9 @@ public:
    // ~SchedulerClientInterface

    // TranscoderCallbackInterface
    void onStarted(ClientIdType clientId, JobIdType jobId) override;
    void onPaused(ClientIdType clientId, JobIdType jobId) override;
    void onResumed(ClientIdType clientId, JobIdType jobId) override;
    void onFinish(ClientIdType clientId, JobIdType jobId) override;
    void onError(ClientIdType clientId, JobIdType jobId, TranscodingErrorCode err) override;
    void onProgressUpdate(ClientIdType clientId, JobIdType jobId, int32_t progress) override;
@@ -105,7 +108,8 @@ private:
    void updateCurrentJob_l();
    void removeJob_l(const JobKeyType& jobKey);
    void moveUidsToTop_l(const std::unordered_set<uid_t>& uids, bool preserveTopUid);

    void notifyClient(ClientIdType clientId, JobIdType jobId, const char* reason,
                      std::function<void(const JobKeyType&)> func);
    // Internal state verifier (debug only)
    void validateState_l();

+24 −9
Original line number Diff line number Diff line
@@ -54,27 +54,43 @@ void SimulatedTranscoder::start(ClientIdType clientId, JobIdType jobId,
    }
    ALOGV("%s: job {%d}: processingTime: %lld", __FUNCTION__, jobId,
          (long long)mJobProcessingTimeMs);
    queueEvent(Event::Start, clientId, jobId);
    queueEvent(Event::Start, clientId, jobId, [=] {
        auto callback = mCallback.lock();
        if (callback != nullptr) {
            callback->onStarted(clientId, jobId);
        }
    });
}

void SimulatedTranscoder::pause(ClientIdType clientId, JobIdType jobId) {
    queueEvent(Event::Pause, clientId, jobId);
    queueEvent(Event::Pause, clientId, jobId, [=] {
        auto callback = mCallback.lock();
        if (callback != nullptr) {
            callback->onPaused(clientId, jobId);
        }
    });
}

void SimulatedTranscoder::resume(ClientIdType clientId, JobIdType jobId) {
    queueEvent(Event::Resume, clientId, jobId);
    queueEvent(Event::Resume, clientId, jobId, [=] {
        auto callback = mCallback.lock();
        if (callback != nullptr) {
            callback->onResumed(clientId, jobId);
        }
    });
}

void SimulatedTranscoder::stop(ClientIdType clientId, JobIdType jobId) {
    queueEvent(Event::Stop, clientId, jobId);
    queueEvent(Event::Stop, clientId, jobId, nullptr);
}

void SimulatedTranscoder::queueEvent(Event::Type type, ClientIdType clientId, JobIdType jobId) {
void SimulatedTranscoder::queueEvent(Event::Type type, ClientIdType clientId, JobIdType jobId,
                                     std::function<void()> runnable) {
    ALOGV("%s: job {%lld, %d}: %s", __FUNCTION__, (long long)clientId, jobId, toString(type));

    auto lock = std::scoped_lock(mLock);

    mQueue.push_back({type, clientId, jobId});
    mQueue.push_back({type, clientId, jobId, runnable});
    mCondition.notify_one();
}

@@ -139,10 +155,9 @@ void SimulatedTranscoder::threadLoop() {
                continue;
            }

            auto callback = mCallback.lock();
            if (callback != nullptr) {
            if (event.runnable != nullptr) {
                lock.unlock();
                callback->onProgressUpdate(event.clientId, event.jobId, event.type);
                event.runnable();
                lock.lock();
            }
        }
+3 −1
Original line number Diff line number Diff line
@@ -42,6 +42,7 @@ public:
        enum Type { NoEvent, Start, Pause, Resume, Stop, Finished, Failed } type;
        ClientIdType clientId;
        JobIdType jobId;
        std::function<void()> runnable;
    };

    static constexpr int64_t kJobDurationUs = 1000000;
@@ -67,7 +68,8 @@ private:
    int64_t mJobProcessingTimeMs = kJobDurationUs / 1000;

    static const char* toString(Event::Type type);
    void queueEvent(Event::Type type, ClientIdType clientId, JobIdType jobId);
    void queueEvent(Event::Type type, ClientIdType clientId, JobIdType jobId,
                    std::function<void()> runnable);
    void threadLoop();
};

Loading