Donate to e Foundation | Murena handsets with /e/OS | Own a part of Murena! Learn more

Commit 4c8abb48 authored by TreeHugger Robot's avatar TreeHugger Robot Committed by Android (Google) Code Review
Browse files

Merge "transcoding: add watchdog to prevent transcoder hang" into sc-dev

parents 6021c9a5 457c689a
Loading
Loading
Loading
Loading
+52 −7
Original line number Diff line number Diff line
@@ -113,6 +113,12 @@ std::string TranscoderWrapper::toString(const Event& event) {
    case Event::Progress:
        typeStr = "Progress";
        break;
    case Event::HeartBeat:
        typeStr = "HeartBeat";
        break;
    case Event::Abandon:
        typeStr = "Abandon";
        break;
    default:
        return "(unknown)";
    }
@@ -154,6 +160,13 @@ public:
        }
    }

    virtual void onHeartBeat(const MediaTranscoder* transcoder __unused) override {
        auto owner = mOwner.lock();
        if (owner != nullptr) {
            owner->onHeartBeat(mClientId, mSessionId);
        }
    }

    virtual void onCodecResourceLost(const MediaTranscoder* transcoder __unused,
                                     const std::shared_ptr<ndk::ScopedAParcel>& pausedState
                                             __unused) override {
@@ -166,12 +179,18 @@ private:
    SessionIdType mSessionId;
};

TranscoderWrapper::TranscoderWrapper() : mCurrentClientId(0), mCurrentSessionId(-1) {
    std::thread(&TranscoderWrapper::threadLoop, this).detach();
TranscoderWrapper::TranscoderWrapper(const std::shared_ptr<TranscoderCallbackInterface>& cb,
                                     int64_t heartBeatIntervalUs)
      : mCallback(cb),
        mHeartBeatIntervalUs(heartBeatIntervalUs),
        mCurrentClientId(0),
        mCurrentSessionId(-1),
        mLooperReady(false) {
    ALOGV("TranscoderWrapper CTOR: %p", this);
}

void TranscoderWrapper::setCallback(const std::shared_ptr<TranscoderCallbackInterface>& cb) {
    mCallback = cb;
TranscoderWrapper::~TranscoderWrapper() {
    ALOGV("TranscoderWrapper DTOR: %p", this);
}

static bool isResourceError(media_status_t err) {
@@ -250,7 +269,7 @@ void TranscoderWrapper::resume(ClientIdType clientId, SessionIdType sessionId,
    });
}

void TranscoderWrapper::stop(ClientIdType clientId, SessionIdType sessionId) {
void TranscoderWrapper::stop(ClientIdType clientId, SessionIdType sessionId, bool abandon) {
    queueEvent(Event::Stop, clientId, sessionId, [=] {
        if (mTranscoder != nullptr && clientId == mCurrentClientId &&
            sessionId == mCurrentSessionId) {
@@ -268,6 +287,10 @@ void TranscoderWrapper::stop(ClientIdType clientId, SessionIdType sessionId) {
        }
        // No callback needed for stop.
    });

    if (abandon) {
        queueEvent(Event::Abandon, 0, 0, nullptr);
    }
}

void TranscoderWrapper::onFinish(ClientIdType clientId, SessionIdType sessionId) {
@@ -311,6 +334,15 @@ void TranscoderWrapper::onProgress(ClientIdType clientId, SessionIdType sessionI
            progress);
}

void TranscoderWrapper::onHeartBeat(ClientIdType clientId, SessionIdType sessionId) {
    queueEvent(Event::HeartBeat, clientId, sessionId, [=] {
        auto callback = mCallback.lock();
        if (callback != nullptr) {
            callback->onHeartBeat(clientId, sessionId);
        }
    });
}

media_status_t TranscoderWrapper::setupTranscoder(
        ClientIdType clientId, SessionIdType sessionId, const TranscodingRequestParcel& request,
        const std::shared_ptr<ITranscodingClientCallback>& clientCb,
@@ -353,8 +385,8 @@ media_status_t TranscoderWrapper::setupTranscoder(
    mCurrentClientId = clientId;
    mCurrentSessionId = sessionId;
    mTranscoderCb = std::make_shared<CallbackImpl>(shared_from_this(), clientId, sessionId);
    mTranscoder = MediaTranscoder::create(mTranscoderCb, request.clientPid, request.clientUid,
                                          pausedState);
    mTranscoder = MediaTranscoder::create(mTranscoderCb, mHeartBeatIntervalUs, request.clientPid,
                                          request.clientUid, pausedState);
    if (mTranscoder == nullptr) {
        ALOGE("failed to create transcoder");
        return AMEDIA_ERROR_UNKNOWN;
@@ -486,6 +518,15 @@ void TranscoderWrapper::queueEvent(Event::Type type, ClientIdType clientId, Sess
                                   const std::function<void()> runnable, int32_t arg) {
    std::scoped_lock lock{mLock};

    if (!mLooperReady) {
        // A shared_ptr to ourselves is given to the thread's stack, so that the TranscoderWrapper
        // object doesn't go away until the thread exits. When a watchdog timeout happens, this
        // allows the session controller to release its reference to the TranscoderWrapper object
        // without blocking on the thread exits.
        std::thread([owner = shared_from_this()]() { owner->threadLoop(); }).detach();
        mLooperReady = true;
    }

    mQueue.push_back({type, clientId, sessionId, runnable, arg});
    mCondition.notify_one();
}
@@ -505,6 +546,10 @@ void TranscoderWrapper::threadLoop() {

        ALOGD("%s: %s", __FUNCTION__, toString(event).c_str());

        if (event.type == Event::Abandon) {
            break;
        }

        lock.unlock();
        event.runnable();
        lock.lock();
+175 −7
Original line number Diff line number Diff line
@@ -24,6 +24,7 @@
#include <media/TranscodingUidPolicy.h>
#include <utils/Log.h>

#include <thread>
#include <utility>

namespace android {
@@ -60,12 +61,140 @@ const char* TranscodingSessionController::sessionStateToString(const Session::St
    return "(unknown)";
}

///////////////////////////////////////////////////////////////////////////////
struct TranscodingSessionController::Watchdog {
    Watchdog(TranscodingSessionController* owner, int64_t timeoutUs);
    ~Watchdog();

    // Starts monitoring the session.
    void start(const SessionKeyType& key);
    // Stops monitoring the session.
    void stop();
    // Signals that the session is still alive. Must be sent at least every mTimeoutUs.
    // (Timeout will happen if no ping in mTimeoutUs since the last ping.)
    void keepAlive();

private:
    void threadLoop();
    void updateTimer_l();

    TranscodingSessionController* mOwner;
    const int64_t mTimeoutUs;
    mutable std::mutex mLock;
    std::condition_variable mCondition GUARDED_BY(mLock);
    // Whether watchdog is monitoring a session for timeout.
    bool mActive GUARDED_BY(mLock);
    // Whether watchdog is aborted and the monitoring thread should exit.
    bool mAbort GUARDED_BY(mLock);
    // When watchdog is active, the next timeout time point.
    std::chrono::system_clock::time_point mNextTimeoutTime GUARDED_BY(mLock);
    // When watchdog is active, the session being watched.
    SessionKeyType mSessionToWatch GUARDED_BY(mLock);
    std::thread mThread;
};

static constexpr int64_t kWatchdogTimeoutUs = 3000000LL;
static constexpr int64_t kTranscoderHeartBeatIntervalUs = 1000000LL;

TranscodingSessionController::Watchdog::Watchdog(TranscodingSessionController* owner,
                                                 int64_t timeoutUs)
      : mOwner(owner),
        mTimeoutUs(timeoutUs),
        mActive(false),
        mAbort(false),
        mThread(&Watchdog::threadLoop, this) {
    ALOGV("Watchdog CTOR: %p", this);
}

TranscodingSessionController::Watchdog::~Watchdog() {
    ALOGV("Watchdog DTOR: %p", this);

    {
        // Exit the looper thread.
        std::scoped_lock lock{mLock};

        mAbort = true;
        mCondition.notify_one();
    }

    mThread.join();
    ALOGV("Watchdog DTOR: %p, done.", this);
}

void TranscodingSessionController::Watchdog::start(const SessionKeyType& key) {
    std::scoped_lock lock{mLock};

    if (!mActive) {
        ALOGI("Watchdog start: %s", sessionToString(key).c_str());

        mActive = true;
        mSessionToWatch = key;
        updateTimer_l();
        mCondition.notify_one();
    }
}

void TranscodingSessionController::Watchdog::stop() {
    std::scoped_lock lock{mLock};

    if (mActive) {
        ALOGI("Watchdog stop: %s", sessionToString(mSessionToWatch).c_str());

        mActive = false;
        mCondition.notify_one();
    }
}

void TranscodingSessionController::Watchdog::keepAlive() {
    std::scoped_lock lock{mLock};

    if (mActive) {
        ALOGI("Watchdog keepAlive: %s", sessionToString(mSessionToWatch).c_str());

        updateTimer_l();
        mCondition.notify_one();
    }
}

// updateTimer_l() is only called with lock held.
void TranscodingSessionController::Watchdog::updateTimer_l() NO_THREAD_SAFETY_ANALYSIS {
    std::chrono::microseconds timeout(mTimeoutUs);
    mNextTimeoutTime = std::chrono::system_clock::now() + timeout;
}

// Unfortunately std::unique_lock is incompatible with -Wthread-safety.
void TranscodingSessionController::Watchdog::threadLoop() NO_THREAD_SAFETY_ANALYSIS {
    std::unique_lock<std::mutex> lock{mLock};

    while (!mAbort) {
        if (!mActive) {
            mCondition.wait(lock);
            continue;
        }
        // Watchdog active, wait till next timeout time.
        if (mCondition.wait_until(lock, mNextTimeoutTime) == std::cv_status::timeout) {
            // If timeout happens, report timeout and deactivate watchdog.
            mActive = false;
            // Make a copy of session key, as once we unlock, it could be unprotected.
            SessionKeyType sessionKey = mSessionToWatch;

            ALOGE("Watchdog timeout: %s", sessionToString(sessionKey).c_str());

            lock.unlock();
            mOwner->onError(sessionKey.first, sessionKey.second,
                            TranscodingErrorCode::kWatchdogTimeout);
            lock.lock();
        }
    }
}
///////////////////////////////////////////////////////////////////////////////

TranscodingSessionController::TranscodingSessionController(
        const std::shared_ptr<TranscoderInterface>& transcoder,
        const TranscoderFactoryType& transcoderFactory,
        const std::shared_ptr<UidPolicyInterface>& uidPolicy,
        const std::shared_ptr<ResourcePolicyInterface>& resourcePolicy,
        const std::shared_ptr<ThermalPolicyInterface>& thermalPolicy)
      : mTranscoder(transcoder),
      : mTranscoderFactory(transcoderFactory),
        mUidPolicy(uidPolicy),
        mResourcePolicy(resourcePolicy),
        mThermalPolicy(thermalPolicy),
@@ -160,6 +289,26 @@ TranscodingSessionController::Session* TranscodingSessionController::getTopSessi
    return &mSessionMap[topSessionKey];
}

void TranscodingSessionController::setSessionState_l(Session* session, Session::State state) {
    bool wasRunning = (session->getState() == Session::RUNNING);
    session->setState(state);
    bool isRunning = (session->getState() == Session::RUNNING);

    if (wasRunning == isRunning) {
        return;
    }

    // Currently we only have 1 running session, and we always put the previous
    // session in non-running state before we run the new session, so it's okay
    // to start/stop the watchdog here. If this assumption changes, we need to
    // track the number of running sessions and start/stop watchdog based on that.
    if (isRunning) {
        mWatchdog->start(session->key);
    } else {
        mWatchdog->stop();
    }
}

void TranscodingSessionController::Session::setState(Session::State newState) {
    if (state == newState) {
        return;
@@ -206,12 +355,17 @@ void TranscodingSessionController::updateCurrentSession_l() {
    // take some actions to ensure it's running.
    if (topSession != curSession ||
        (shouldBeRunning ^ (topSession->getState() == Session::RUNNING))) {
        if (mTranscoder == nullptr) {
            mTranscoder = mTranscoderFactory(shared_from_this(), kTranscoderHeartBeatIntervalUs);
            mWatchdog = std::make_shared<Watchdog>(this, kWatchdogTimeoutUs);
        }

        // If current session is running, pause it first. Note this is true for either
        // cases: 1) If top session is changing, or 2) if top session is not changing but
        // the topSession's state is changing.
        if (curSession != nullptr && curSession->getState() == Session::RUNNING) {
            mTranscoder->pause(curSession->key.first, curSession->key.second);
            curSession->setState(Session::PAUSED);
            setSessionState_l(curSession, Session::PAUSED);
        }
        // If we are not experiencing resource loss nor thermal throttling, we can start
        // or resume the topSession now.
@@ -223,7 +377,7 @@ void TranscodingSessionController::updateCurrentSession_l() {
                mTranscoder->resume(topSession->key.first, topSession->key.second,
                                    topSession->request, topSession->callback.lock());
            }
            topSession->setState(Session::RUNNING);
            setSessionState_l(topSession, Session::RUNNING);
        }
    }
    mCurrentSession = topSession;
@@ -264,7 +418,7 @@ void TranscodingSessionController::removeSession_l(const SessionKeyType& session
        mCurrentSession = nullptr;
    }

    mSessionMap[sessionKey].setState(finalState);
    setSessionState_l(&mSessionMap[sessionKey], finalState);
    mSessionHistory.push_back(mSessionMap[sessionKey]);
    if (mSessionHistory.size() > kSessionHistoryMax) {
        mSessionHistory.erase(mSessionHistory.begin());
@@ -361,7 +515,7 @@ bool TranscodingSessionController::submit(
    mSessionMap[sessionKey].pauseCount = 0;
    mSessionMap[sessionKey].request = request;
    mSessionMap[sessionKey].callback = callback;
    mSessionMap[sessionKey].setState(Session::NOT_STARTED);
    setSessionState_l(&mSessionMap[sessionKey], Session::NOT_STARTED);

    // If it's an offline session, the queue was already added in constructor.
    // If it's a real-time sessions, check if a queue is already present for the uid,
@@ -527,6 +681,15 @@ void TranscodingSessionController::onFinish(ClientIdType clientId, SessionIdType
void TranscodingSessionController::onError(ClientIdType clientId, SessionIdType sessionId,
                                           TranscodingErrorCode err) {
    notifyClient(clientId, sessionId, "error", [=](const SessionKeyType& sessionKey) {
        if (err == TranscodingErrorCode::kWatchdogTimeout) {
            // Abandon the transcoder, as its handler thread might be stuck in some call to
            // MediaTranscoder altogether, and may not be able to handle any new tasks.
            mTranscoder->stop(clientId, sessionId, true /*abandon*/);
            // Clear the last ref count before we create new transcoder.
            mTranscoder = nullptr;
            mTranscoder = mTranscoderFactory(shared_from_this(), kTranscoderHeartBeatIntervalUs);
        }

        {
            auto clientCallback = mSessionMap[sessionKey].callback.lock();
            if (clientCallback != nullptr) {
@@ -555,6 +718,11 @@ void TranscodingSessionController::onProgressUpdate(ClientIdType clientId, Sessi
    });
}

void TranscodingSessionController::onHeartBeat(ClientIdType clientId, SessionIdType sessionId) {
    notifyClient(clientId, sessionId, "heart-beat",
                 [=](const SessionKeyType& /*sessionKey*/) { mWatchdog->keepAlive(); });
}

void TranscodingSessionController::onResourceLost(ClientIdType clientId, SessionIdType sessionId) {
    ALOGI("%s", __FUNCTION__);

@@ -572,7 +740,7 @@ void TranscodingSessionController::onResourceLost(ClientIdType clientId, Session
        // If we receive a resource loss event, the transcoder already paused the transcoding,
        // so we don't need to call onPaused() to pause it. However, we still need to notify
        // the client and update the session state here.
        resourceLostSession->setState(Session::PAUSED);
        setSessionState_l(resourceLostSession, Session::PAUSED);
        // Notify the client as a paused event.
        auto clientCallback = resourceLostSession->callback.lock();
        if (clientCallback != nullptr) {
+1 −0
Original line number Diff line number Diff line
@@ -31,4 +31,5 @@ enum TranscodingErrorCode {
    kInvalidOperation = 5,
    kErrorIO = 6,
    kInsufficientResources = 7,
    kWatchdogTimeout = 8,
}
 No newline at end of file
+4 −2
Original line number Diff line number Diff line
@@ -32,7 +32,6 @@ class TranscoderCallbackInterface;
// Interface for the controller to call the transcoder to take actions.
class TranscoderInterface {
public:
    virtual void setCallback(const std::shared_ptr<TranscoderCallbackInterface>& cb) = 0;
    virtual void start(ClientIdType clientId, SessionIdType sessionId,
                       const TranscodingRequestParcel& request,
                       const std::shared_ptr<ITranscodingClientCallback>& clientCallback) = 0;
@@ -40,7 +39,9 @@ public:
    virtual void resume(ClientIdType clientId, SessionIdType sessionId,
                        const TranscodingRequestParcel& request,
                        const std::shared_ptr<ITranscodingClientCallback>& clientCallback) = 0;
    virtual void stop(ClientIdType clientId, SessionIdType sessionId) = 0;
    // Stop the specified session. If abandon is true, the transcoder wrapper will be discarded
    // after the session stops.
    virtual void stop(ClientIdType clientId, SessionIdType sessionId, bool abandon = false) = 0;

protected:
    virtual ~TranscoderInterface() = default;
@@ -59,6 +60,7 @@ public:
                         TranscodingErrorCode err) = 0;
    virtual void onProgressUpdate(ClientIdType clientId, SessionIdType sessionId,
                                  int32_t progress) = 0;
    virtual void onHeartBeat(ClientIdType clientId, SessionIdType sessionId) = 0;

    // Called when transcoding becomes temporarily inaccessible due to loss of resource.
    // If there is any session currently running, it will be paused. When resource contention
+29 −11
Original line number Diff line number Diff line
@@ -36,22 +36,36 @@ class Parcelable;
class TranscoderWrapper : public TranscoderInterface,
                          public std::enable_shared_from_this<TranscoderWrapper> {
public:
    TranscoderWrapper();
    TranscoderWrapper(const std::shared_ptr<TranscoderCallbackInterface>& cb,
                      int64_t heartBeatIntervalUs);
    ~TranscoderWrapper();

    virtual void setCallback(const std::shared_ptr<TranscoderCallbackInterface>& cb) override;
    virtual void start(ClientIdType clientId, SessionIdType sessionId,
    // TranscoderInterface
    void start(ClientIdType clientId, SessionIdType sessionId,
               const TranscodingRequestParcel& request,
               const std::shared_ptr<ITranscodingClientCallback>& clientCallback) override;
    virtual void pause(ClientIdType clientId, SessionIdType sessionId) override;
    virtual void resume(ClientIdType clientId, SessionIdType sessionId,
    void pause(ClientIdType clientId, SessionIdType sessionId) override;
    void resume(ClientIdType clientId, SessionIdType sessionId,
                const TranscodingRequestParcel& request,
                const std::shared_ptr<ITranscodingClientCallback>& clientCallback) override;
    virtual void stop(ClientIdType clientId, SessionIdType sessionId) override;
    void stop(ClientIdType clientId, SessionIdType sessionId, bool abandon = false) override;
    // ~TranscoderInterface

private:
    class CallbackImpl;
    struct Event {
        enum Type { NoEvent, Start, Pause, Resume, Stop, Finish, Error, Progress } type;
        enum Type {
            NoEvent,
            Start,
            Pause,
            Resume,
            Stop,
            Finish,
            Error,
            Progress,
            HeartBeat,
            Abandon
        } type;
        ClientIdType clientId;
        SessionIdType sessionId;
        std::function<void()> runnable;
@@ -62,17 +76,21 @@ private:
    std::shared_ptr<CallbackImpl> mTranscoderCb;
    std::shared_ptr<MediaTranscoder> mTranscoder;
    std::weak_ptr<TranscoderCallbackInterface> mCallback;
    int64_t mHeartBeatIntervalUs;
    std::mutex mLock;
    std::condition_variable mCondition;
    std::list<Event> mQueue;  // GUARDED_BY(mLock);
    std::map<SessionKeyType, std::shared_ptr<ndk::ScopedAParcel>> mPausedStateMap;
    ClientIdType mCurrentClientId;
    SessionIdType mCurrentSessionId;
    // Whether the looper has been created.
    bool mLooperReady;

    static std::string toString(const Event& event);
    void onFinish(ClientIdType clientId, SessionIdType sessionId);
    void onError(ClientIdType clientId, SessionIdType sessionId, media_status_t status);
    void onProgress(ClientIdType clientId, SessionIdType sessionId, int32_t progress);
    void onHeartBeat(ClientIdType clientId, SessionIdType sessionId);

    media_status_t handleStart(ClientIdType clientId, SessionIdType sessionId,
                               const TranscodingRequestParcel& request,
Loading