Donate to e Foundation | Murena handsets with /e/OS | Own a part of Murena! Learn more

Commit e51446c7 authored by TreeHugger Robot's avatar TreeHugger Robot Committed by Android (Google) Code Review
Browse files

Merge "transcoding: drop transcoding request for bad actors" into sc-dev

parents 8eb1a42e 87d199cf
Loading
Loading
Loading
Loading
+145 −42
Original line number Diff line number Diff line
@@ -87,15 +87,12 @@ private:
    // Whether watchdog is aborted and the monitoring thread should exit.
    bool mAbort GUARDED_BY(mLock);
    // When watchdog is active, the next timeout time point.
    std::chrono::system_clock::time_point mNextTimeoutTime GUARDED_BY(mLock);
    std::chrono::steady_clock::time_point mNextTimeoutTime GUARDED_BY(mLock);
    // When watchdog is active, the session being watched.
    SessionKeyType mSessionToWatch GUARDED_BY(mLock);
    std::thread mThread;
};

static constexpr int64_t kWatchdogTimeoutUs = 3000000LL;
static constexpr int64_t kTranscoderHeartBeatIntervalUs = 1000000LL;

TranscodingSessionController::Watchdog::Watchdog(TranscodingSessionController* owner,
                                                 int64_t timeoutUs)
      : mOwner(owner),
@@ -159,7 +156,7 @@ void TranscodingSessionController::Watchdog::keepAlive() {
// updateTimer_l() is only called with lock held.
void TranscodingSessionController::Watchdog::updateTimer_l() NO_THREAD_SAFETY_ANALYSIS {
    std::chrono::microseconds timeout(mTimeoutUs);
    mNextTimeoutTime = std::chrono::system_clock::now() + timeout;
    mNextTimeoutTime = std::chrono::steady_clock::now() + timeout;
}

// Unfortunately std::unique_lock is incompatible with -Wthread-safety.
@@ -187,13 +184,85 @@ void TranscodingSessionController::Watchdog::threadLoop() NO_THREAD_SAFETY_ANALY
        }
    }
}
///////////////////////////////////////////////////////////////////////////////
struct TranscodingSessionController::Pacer {
    Pacer(const ControllerConfig& config)
          : mBurstThresholdMs(config.pacerBurstThresholdMs),
            mBurstCountQuota(config.pacerBurstCountQuota),
            mBurstTimeQuotaSec(config.pacerBurstTimeQuotaSeconds) {}

    ~Pacer() = default;

    void onSessionCompleted(uid_t uid, std::chrono::microseconds runningTime);
    bool onSessionStarted(uid_t uid);

private:
    // Threshold of time between finish/start below which a back-to-back start is counted.
    int32_t mBurstThresholdMs;
    // Maximum allowed back-to-back start count.
    int32_t mBurstCountQuota;
    // Maximum allowed back-to-back running time.
    int32_t mBurstTimeQuotaSec;

    struct UidHistoryEntry {
        std::chrono::steady_clock::time_point lastCompletedTime;
        int32_t burstCount = 0;
        std::chrono::steady_clock::duration burstDuration{0};
    };
    std::map<uid_t, UidHistoryEntry> mUidHistoryMap;
};

void TranscodingSessionController::Pacer::onSessionCompleted(
        uid_t uid, std::chrono::microseconds runningTime) {
    if (mUidHistoryMap.find(uid) == mUidHistoryMap.end()) {
        mUidHistoryMap.emplace(uid, UidHistoryEntry{});
    }
    mUidHistoryMap[uid].lastCompletedTime = std::chrono::steady_clock::now();
    mUidHistoryMap[uid].burstCount++;
    mUidHistoryMap[uid].burstDuration += runningTime;
}

bool TranscodingSessionController::Pacer::onSessionStarted(uid_t uid) {
    // If uid doesn't exist, this uid has no completed sessions. Skip.
    if (mUidHistoryMap.find(uid) == mUidHistoryMap.end()) {
        return true;
    }

    // TODO: if Thermal throttling or resoure lost happened to occurr between this start
    // and the previous completion, we should deduct the paused time from the elapsed time.
    // (Individual session's pause time, on the other hand, doesn't need to be deducted
    // because it doesn't affect the gap between last completion and the start.
    auto timeSinceLastComplete =
            std::chrono::steady_clock::now() - mUidHistoryMap[uid].lastCompletedTime;
    if (mUidHistoryMap[uid].burstCount >= mBurstCountQuota &&
        mUidHistoryMap[uid].burstDuration >= std::chrono::seconds(mBurstTimeQuotaSec)) {
        ALOGW("Pacer: uid %d: over quota, burst count %d, time %lldms", uid,
              mUidHistoryMap[uid].burstCount, (long long)mUidHistoryMap[uid].burstDuration.count());
        return false;
    }

    // If not over quota, allow the session, and reset as long as this is not too close
    // to previous completion.
    if (timeSinceLastComplete > std::chrono::milliseconds(mBurstThresholdMs)) {
        ALOGV("Pacer: uid %d: reset quota", uid);
        mUidHistoryMap[uid].burstCount = 0;
        mUidHistoryMap[uid].burstDuration = std::chrono::milliseconds(0);
    } else {
        ALOGV("Pacer: uid %d: burst count %d, time %lldms", uid, mUidHistoryMap[uid].burstCount,
              (long long)mUidHistoryMap[uid].burstDuration.count());
    }

    return true;
}

///////////////////////////////////////////////////////////////////////////////

TranscodingSessionController::TranscodingSessionController(
        const TranscoderFactoryType& transcoderFactory,
        const std::shared_ptr<UidPolicyInterface>& uidPolicy,
        const std::shared_ptr<ResourcePolicyInterface>& resourcePolicy,
        const std::shared_ptr<ThermalPolicyInterface>& thermalPolicy)
        const std::shared_ptr<ThermalPolicyInterface>& thermalPolicy,
        const ControllerConfig* config)
      : mTranscoderFactory(transcoderFactory),
        mUidPolicy(uidPolicy),
        mResourcePolicy(resourcePolicy),
@@ -206,6 +275,13 @@ TranscodingSessionController::TranscodingSessionController(
    mSessionQueues.emplace(OFFLINE_UID, SessionQueueType());
    mUidPackageNames[OFFLINE_UID] = "(offline)";
    mThermalThrottling = thermalPolicy->getThrottlingStatus();
    if (config != nullptr) {
        mConfig = *config;
    }
    mPacer.reset(new Pacer(mConfig));
    ALOGD("@@@ watchdog %lld, burst count %d, burst time %d, burst threshold %d",
          (long long)mConfig.watchdogTimeoutUs, mConfig.pacerBurstCountQuota,
          mConfig.pacerBurstTimeQuotaSeconds, mConfig.pacerBurstThresholdMs);
}

TranscodingSessionController::~TranscodingSessionController() {}
@@ -280,10 +356,21 @@ void TranscodingSessionController::dumpAllSessions(int fd, const Vector<String16
    write(fd, result.string(), result.size());
}

/*
 * Returns nullptr if there is no session, or we're paused globally (due to resource lost,
 * thermal throttling, etc.). Otherwise, return the session that should be run next.
 */
TranscodingSessionController::Session* TranscodingSessionController::getTopSession_l() {
    if (mSessionMap.empty()) {
        return nullptr;
    }

    // Return nullptr if we're paused globally due to resource lost or thermal throttling.
    if (((mResourcePolicy != nullptr && mResourceLost) ||
         (mThermalPolicy != nullptr && mThermalThrottling))) {
        return nullptr;
    }

    uid_t topUid = *mUidSortedList.begin();
    SessionKeyType topSessionKey = *mSessionQueues[topUid].begin();
    return &mSessionMap[topSessionKey];
@@ -313,9 +400,10 @@ void TranscodingSessionController::Session::setState(Session::State newState) {
    if (state == newState) {
        return;
    }
    auto nowTime = std::chrono::system_clock::now();
    auto nowTime = std::chrono::steady_clock::now();
    if (state != INVALID) {
        std::chrono::microseconds elapsedTime = (nowTime - stateEnterTime);
        std::chrono::microseconds elapsedTime =
                std::chrono::duration_cast<std::chrono::microseconds>(nowTime - stateEnterTime);
        switch (state) {
        case PAUSED:
            pausedTime = pausedTime + elapsedTime;
@@ -338,49 +426,60 @@ void TranscodingSessionController::Session::setState(Session::State newState) {
}

void TranscodingSessionController::updateCurrentSession_l() {
    Session* topSession = getTopSession_l();
    Session* curSession = mCurrentSession;
    ALOGV("updateCurrentSession: topSession is %s, curSession is %s",
          topSession == nullptr ? "null" : sessionToString(topSession->key).c_str(),
          curSession == nullptr ? "null" : sessionToString(curSession->key).c_str());

    if (topSession == nullptr) {
        mCurrentSession = nullptr;
        return;
    }
    Session* topSession = getTopSession_l();

    bool shouldBeRunning = !((mResourcePolicy != nullptr && mResourceLost) ||
                             (mThermalPolicy != nullptr && mThermalThrottling));
    // If we found a topSession that should be run, and it's not already running,
    // take some actions to ensure it's running.
    if (topSession != curSession ||
        (shouldBeRunning ^ (topSession->getState() == Session::RUNNING))) {
    // Delayed init of transcoder and watchdog.
    if (mTranscoder == nullptr) {
            mTranscoder = mTranscoderFactory(shared_from_this(), kTranscoderHeartBeatIntervalUs);
            mWatchdog = std::make_shared<Watchdog>(this, kWatchdogTimeoutUs);
        mTranscoder = mTranscoderFactory(shared_from_this());
        mWatchdog = std::make_shared<Watchdog>(this, mConfig.watchdogTimeoutUs);
    }

        // If current session is running, pause it first. Note this is true for either
        // cases: 1) If top session is changing, or 2) if top session is not changing but
        // the topSession's state is changing.
    // If we found a different top session, or the top session's running state is not
    // correct. Take some actions to ensure it's correct.
    while ((topSession = getTopSession_l()) != curSession ||
           (topSession != nullptr && !topSession->isRunning())) {
        ALOGV("updateCurrentSession_l: topSession is %s, curSession is %s",
              topSession == nullptr ? "null" : sessionToString(topSession->key).c_str(),
              curSession == nullptr ? "null" : sessionToString(curSession->key).c_str());

        // If current session is running, pause it first. Note this is needed for either
        // cases: 1) Top session is changing to another session, or 2) Top session is
        // changing to null (which means we should be globally paused).
        if (curSession != nullptr && curSession->getState() == Session::RUNNING) {
            mTranscoder->pause(curSession->key.first, curSession->key.second);
            setSessionState_l(curSession, Session::PAUSED);
        }
        // If we are not experiencing resource loss nor thermal throttling, we can start
        // or resume the topSession now.
        if (shouldBeRunning) {

        if (topSession == nullptr) {
            // Nothing more to run (either no session or globally paused).
            break;
        }

        // Otherwise, ensure topSession is running.
        if (topSession->getState() == Session::NOT_STARTED) {
                mTranscoder->start(topSession->key.first, topSession->key.second,
                                   topSession->request, topSession->callingUid,
                                   topSession->callback.lock());
            } else if (topSession->getState() == Session::PAUSED) {
                mTranscoder->resume(topSession->key.first, topSession->key.second,
                                    topSession->request, topSession->callingUid,
                                    topSession->callback.lock());
            if (!mPacer->onSessionStarted(topSession->clientUid)) {
                // Unfortunately this uid is out of quota for new sessions.
                // Drop this sesion and try another one.
                {
                    auto clientCallback = mSessionMap[topSession->key].callback.lock();
                    if (clientCallback != nullptr) {
                        clientCallback->onTranscodingFailed(
                                topSession->key.second, TranscodingErrorCode::kDroppedByService);
                    }
                }
                removeSession_l(topSession->key, Session::DROPPED_BY_PACER);
                continue;
            }
            mTranscoder->start(topSession->key.first, topSession->key.second, topSession->request,
                               topSession->callingUid, topSession->callback.lock());
            setSessionState_l(topSession, Session::RUNNING);
        } else if (topSession->getState() == Session::PAUSED) {
            mTranscoder->resume(topSession->key.first, topSession->key.second, topSession->request,
                                topSession->callingUid, topSession->callback.lock());
            setSessionState_l(topSession, Session::RUNNING);
        }
        break;
    }
    mCurrentSession = topSession;
}
@@ -421,6 +520,12 @@ void TranscodingSessionController::removeSession_l(const SessionKeyType& session
    }

    setSessionState_l(&mSessionMap[sessionKey], finalState);

    if (finalState == Session::FINISHED || finalState == Session::ERROR) {
        mPacer->onSessionCompleted(mSessionMap[sessionKey].clientUid,
                                   mSessionMap[sessionKey].runningTime);
    }

    mSessionHistory.push_back(mSessionMap[sessionKey]);
    if (mSessionHistory.size() > kSessionHistoryMax) {
        mSessionHistory.erase(mSessionHistory.begin());
@@ -514,8 +619,6 @@ bool TranscodingSessionController::submit(
    mSessionMap[sessionKey].key = sessionKey;
    mSessionMap[sessionKey].clientUid = clientUid;
    mSessionMap[sessionKey].callingUid = callingUid;
    mSessionMap[sessionKey].lastProgress = 0;
    mSessionMap[sessionKey].pauseCount = 0;
    mSessionMap[sessionKey].request = request;
    mSessionMap[sessionKey].callback = callback;
    setSessionState_l(&mSessionMap[sessionKey], Session::NOT_STARTED);
@@ -690,7 +793,7 @@ void TranscodingSessionController::onError(ClientIdType clientId, SessionIdType
            mTranscoder->stop(clientId, sessionId, true /*abandon*/);
            // Clear the last ref count before we create new transcoder.
            mTranscoder = nullptr;
            mTranscoder = mTranscoderFactory(shared_from_this(), kTranscoderHeartBeatIntervalUs);
            mTranscoder = mTranscoderFactory(shared_from_this());
        }

        {
+14 −8
Original line number Diff line number Diff line
@@ -23,13 +23,19 @@ package android.media;
 */
@Backing(type = "int")
enum TranscodingErrorCode {
    // Errors exposed to client side.
    kNoError = 0,
    kUnknown = 1,
    kMalformed = 2,
    kUnsupported = 3,
    kInvalidParameter = 4,
    kInvalidOperation = 5,
    kErrorIO = 6,
    kInsufficientResources = 7,
    kWatchdogTimeout = 8,
    kDroppedByService = 1,
    kServiceUnavailable = 2,

    // Other private errors.
    kPrivateErrorFirst     = 1000,
    kUnknown               = kPrivateErrorFirst + 0,
    kMalformed             = kPrivateErrorFirst + 1,
    kUnsupported           = kPrivateErrorFirst + 2,
    kInvalidParameter      = kPrivateErrorFirst + 3,
    kInvalidOperation      = kPrivateErrorFirst + 4,
    kErrorIO               = kPrivateErrorFirst + 5,
    kInsufficientResources = kPrivateErrorFirst + 6,
    kWatchdogTimeout       = kPrivateErrorFirst + 7,
}
 No newline at end of file
+26 −8
Original line number Diff line number Diff line
@@ -93,7 +93,18 @@ private:
    using SessionKeyType = std::pair<ClientIdType, SessionIdType>;
    using SessionQueueType = std::list<SessionKeyType>;
    using TranscoderFactoryType = std::function<std::shared_ptr<TranscoderInterface>(
            const std::shared_ptr<TranscoderCallbackInterface>&, int64_t)>;
            const std::shared_ptr<TranscoderCallbackInterface>&)>;

    struct ControllerConfig {
        // Watchdog timeout.
        int64_t watchdogTimeoutUs = 3000000LL;
        // Threshold of time between finish/start below which a back-to-back start is counted.
        int32_t pacerBurstThresholdMs = 1000;
        // Maximum allowed back-to-back start count.
        int32_t pacerBurstCountQuota = 10;
        // Maximum allowed back-to-back running time.
        int32_t pacerBurstTimeQuotaSeconds = 180;  // 3-min
    };

    struct Session {
        enum State {
@@ -106,16 +117,17 @@ private:
            FINISHED,
            CANCELED,
            ERROR,
            DROPPED_BY_PACER,
        };
        SessionKeyType key;
        uid_t clientUid;
        uid_t callingUid;
        int32_t lastProgress;
        int32_t pauseCount;
        std::chrono::time_point<std::chrono::system_clock> stateEnterTime;
        std::chrono::microseconds waitingTime;
        std::chrono::microseconds runningTime;
        std::chrono::microseconds pausedTime;
        int32_t lastProgress = 0;
        int32_t pauseCount = 0;
        std::chrono::time_point<std::chrono::steady_clock> stateEnterTime;
        std::chrono::microseconds waitingTime{0};
        std::chrono::microseconds runningTime{0};
        std::chrono::microseconds pausedTime{0};

        TranscodingRequest request;
        std::weak_ptr<ITranscodingClientCallback> callback;
@@ -123,12 +135,16 @@ private:
        // Must use setState to change state.
        void setState(Session::State state);
        State getState() const { return state; }
        bool isRunning() { return state == RUNNING; }

    private:
        State state = INVALID;
    };

    struct Watchdog;
    struct Pacer;

    ControllerConfig mConfig;

    // TODO(chz): call transcoder without global lock.
    // Use mLock for all entrypoints for now.
@@ -156,12 +172,14 @@ private:
    bool mThermalThrottling;
    std::list<Session> mSessionHistory;
    std::shared_ptr<Watchdog> mWatchdog;
    std::shared_ptr<Pacer> mPacer;

    // Only allow MediaTranscodingService and unit tests to instantiate.
    TranscodingSessionController(const TranscoderFactoryType& transcoderFactory,
                                 const std::shared_ptr<UidPolicyInterface>& uidPolicy,
                                 const std::shared_ptr<ResourcePolicyInterface>& resourcePolicy,
                                 const std::shared_ptr<ThermalPolicyInterface>& thermalPolicy);
                                 const std::shared_ptr<ThermalPolicyInterface>& thermalPolicy,
                                 const ControllerConfig* config = nullptr);

    void dumpSession_l(const Session& session, String8& result, bool closedSession = false);
    Session* getTopSession_l();
+94 −23

File changed.

Preview size limit exceeded, changes collapsed.

+2 −2
Original line number Diff line number Diff line
@@ -237,8 +237,8 @@ media_status_t MediaSampleWriter::runWriterLoop(bool* wasStopped) NO_THREAD_SAFE
    }

    std::chrono::microseconds updateInterval(mHeartBeatIntervalUs);
    std::chrono::system_clock::time_point nextUpdateTime =
            std::chrono::system_clock::now() + updateInterval;
    std::chrono::steady_clock::time_point nextUpdateTime =
            std::chrono::steady_clock::now() + updateInterval;

    while (true) {
        if (trackEosCount >= mTracks.size()) {
Loading