Donate to e Foundation | Murena handsets with /e/OS | Own a part of Murena! Learn more

Commit 20fcdc6e authored by TreeHugger Robot's avatar TreeHugger Robot Committed by Android (Google) Code Review
Browse files

Merge "transcoding: add uid state based scheduling policy"

parents 9e6c2135 acb3350e
Loading
Loading
Loading
Loading
+2 −0
Original line number Diff line number Diff line
@@ -39,6 +39,7 @@ cc_library_shared {
    srcs: [
        "TranscodingClientManager.cpp",
        "TranscodingJobScheduler.cpp",
        "TranscodingUidPolicy.cpp",
    ],

    shared_libs: [
@@ -47,6 +48,7 @@ cc_library_shared {
        "liblog",
        "libutils",
        "libmediatranscoder",
        "libbinder",
    ],

    export_include_dirs: ["include"],
+9 −8
Original line number Diff line number Diff line
@@ -43,7 +43,8 @@ struct TranscodingClientManager::ClientImpl : public BnTranscodingClient {
     * object doesn't get created again, otherwise the binder object pointer
     * may not be unique.
     */
    SpAIBinder mClientCallback;
    SpAIBinder mClientBinder;
    std::shared_ptr<ITranscodingClientCallback> mClientCallback;
    /* A unique id assigned to the client by the service. This number is used
     * by the service for indexing. Here we use the binder object's pointer
     * (casted to int64t_t) as the client id.
@@ -78,8 +79,9 @@ TranscodingClientManager::ClientImpl::ClientImpl(
        const std::shared_ptr<ITranscodingClientCallback>& callback, pid_t pid, uid_t uid,
        const std::string& clientName, const std::string& opPackageName,
        TranscodingClientManager* owner)
      : mClientCallback((callback != nullptr) ? callback->asBinder() : nullptr),
        mClientId((int64_t)mClientCallback.get()),
      : mClientBinder((callback != nullptr) ? callback->asBinder() : nullptr),
        mClientCallback(callback),
        mClientId((int64_t)mClientBinder.get()),
        mClientPid(pid),
        mClientUid(uid),
        mClientName(clientName),
@@ -98,9 +100,8 @@ Status TranscodingClientManager::ClientImpl::submitRequest(

    int32_t jobId = mNextJobId.fetch_add(1);

    *_aidl_return =
            mOwner->mJobScheduler->submit(mClientId, jobId, mClientUid, in_request,
                                          ITranscodingClientCallback::fromBinder(mClientCallback));
    *_aidl_return = mOwner->mJobScheduler->submit(mClientId, jobId, mClientUid, in_request,
                                                  mClientCallback);

    if (*_aidl_return) {
        out_job->jobId = jobId;
@@ -205,7 +206,7 @@ status_t TranscodingClientManager::addClient(
          (long long)client->mClientId, client->mClientPid, client->mClientUid,
          client->mClientName.c_str(), client->mClientOpPackageName.c_str());

    AIBinder_linkToDeath(client->mClientCallback.get(), mDeathRecipient.get(),
    AIBinder_linkToDeath(client->mClientBinder.get(), mDeathRecipient.get(),
                         reinterpret_cast<void*>(client.get()));

    // Adds the new client to the map.
@@ -227,7 +228,7 @@ status_t TranscodingClientManager::removeClient(ClientIdType clientId) {
        return INVALID_OPERATION;
    }

    SpAIBinder callback = it->second->mClientCallback;
    SpAIBinder callback = it->second->mClientBinder;

    // Check if the client still live. If alive, unlink the death.
    if (callback.get() != nullptr) {
+106 −23
Original line number Diff line number Diff line
@@ -106,6 +106,10 @@ void TranscodingJobScheduler::removeJob_l(const JobKeyType& jobKey) {
    if (uid != OFFLINE_UID && jobQueue.empty()) {
        mUidSortedList.remove(uid);
        mJobQueues.erase(uid);
        mUidPolicy->unregisterMonitorUid(uid);

        std::unordered_set<uid_t> topUids = mUidPolicy->getTopUids();
        moveUidsToTop_l(topUids, false /*preserveTopUid*/);
    }

    // Clear current job.
@@ -117,6 +121,59 @@ void TranscodingJobScheduler::removeJob_l(const JobKeyType& jobKey) {
    mJobMap.erase(jobKey);
}

/**
 * Moves the set of uids to the front of mUidSortedList (which is used to pick
 * the next job to run).
 *
 * This is called when 1) we received a onTopUidsChanged() callbcak from UidPolicy,
 * or 2) we removed the job queue for a uid because it becomes empty.
 *
 * In case of 1), if there are multiple uids in the set, and the current front
 * uid in mUidSortedList is still in the set, we try to keep that uid at front
 * so that current job run is not interrupted. (This is not a concern for case 2)
 * because the queue for a uid was just removed entirely.)
 */
void TranscodingJobScheduler::moveUidsToTop_l(const std::unordered_set<uid_t>& uids,
                                              bool preserveTopUid) {
    // If uid set is empty, nothing to do. Do not change the queue status.
    if (uids.empty()) {
        return;
    }

    // Save the current top uid.
    uid_t curTopUid = *mUidSortedList.begin();
    bool pushCurTopToFront = false;
    int32_t numUidsMoved = 0;

    // Go through the sorted uid list once, and move the ones in top set to front.
    for (auto it = mUidSortedList.begin(); it != mUidSortedList.end();) {
        uid_t uid = *it;

        if (uid != OFFLINE_UID && uids.count(uid) > 0) {
            it = mUidSortedList.erase(it);

            // If this is the top we're preserving, don't push it here, push
            // it after the for-loop.
            if (uid == curTopUid && preserveTopUid) {
                pushCurTopToFront = true;
            } else {
                mUidSortedList.push_front(uid);
            }

            // If we found all uids in the set, break out.
            if (++numUidsMoved == uids.size()) {
                break;
            }
        } else {
            ++it;
        }
    }

    if (pushCurTopToFront) {
        mUidSortedList.push_front(curTopUid);
    }
}

bool TranscodingJobScheduler::submit(ClientIdType clientId, int32_t jobId, uid_t uid,
                                     const TranscodingRequestParcel& request,
                                     const std::weak_ptr<ITranscodingClientCallback>& callback) {
@@ -150,6 +207,7 @@ bool TranscodingJobScheduler::submit(ClientIdType clientId, int32_t jobId, uid_t
    // and add a new queue if needed.
    if (uid != OFFLINE_UID) {
        if (mJobQueues.count(uid) == 0) {
            mUidPolicy->registerMonitorUid(uid);
            if (mUidPolicy->isUidOnTop(uid)) {
                mUidSortedList.push_front(uid);
            } else {
@@ -222,7 +280,7 @@ void TranscodingJobScheduler::onFinish(ClientIdType clientId, int32_t jobId) {
    std::scoped_lock lock{mLock};

    if (mJobMap.count(jobKey) == 0) {
        ALOGW("ignoring abort for non-existent job");
        ALOGW("ignoring finish for non-existent job");
        return;
    }

@@ -230,7 +288,7 @@ void TranscodingJobScheduler::onFinish(ClientIdType clientId, int32_t jobId) {
    // to client if the job is paused. Transcoder could have posted finish when
    // we're pausing it, and the finish arrived after we changed current job.
    if (mJobMap[jobKey].state == Job::NOT_STARTED) {
        ALOGW("ignoring abort for job that was never started");
        ALOGW("ignoring finish for job that was never started");
        return;
    }

@@ -258,7 +316,7 @@ void TranscodingJobScheduler::onError(int64_t clientId, int32_t jobId, Transcodi
    std::scoped_lock lock{mLock};

    if (mJobMap.count(jobKey) == 0) {
        ALOGW("ignoring abort for non-existent job");
        ALOGW("ignoring error for non-existent job");
        return;
    }

@@ -266,7 +324,7 @@ void TranscodingJobScheduler::onError(int64_t clientId, int32_t jobId, Transcodi
    // to client if the job is paused. Transcoder could have posted finish when
    // we're pausing it, and the finish arrived after we changed current job.
    if (mJobMap[jobKey].state == Job::NOT_STARTED) {
        ALOGW("ignoring abort for job that was never started");
        ALOGW("ignoring error for job that was never started");
        return;
    }

@@ -286,6 +344,34 @@ void TranscodingJobScheduler::onError(int64_t clientId, int32_t jobId, Transcodi
    validateState_l();
}

void TranscodingJobScheduler::onProgressUpdate(int64_t clientId, int32_t jobId, int32_t progress) {
    JobKeyType jobKey = std::make_pair(clientId, jobId);

    ALOGV("%s: job %s, progress %d", __FUNCTION__, jobToString(jobKey).c_str(), progress);

    std::scoped_lock lock{mLock};

    if (mJobMap.count(jobKey) == 0) {
        ALOGW("ignoring progress for non-existent job");
        return;
    }

    // Only ignore if job was never started. In particular, propagate the status
    // to client if the job is paused. Transcoder could have posted finish when
    // we're pausing it, and the finish arrived after we changed current job.
    if (mJobMap[jobKey].state == Job::NOT_STARTED) {
        ALOGW("ignoring progress for job that was never started");
        return;
    }

    {
        auto clientCallback = mJobMap[jobKey].callback.lock();
        if (clientCallback != nullptr) {
            clientCallback->onProgressUpdate(jobId, progress);
        }
    }
}

void TranscodingJobScheduler::onResourceLost() {
    ALOGV("%s", __FUNCTION__);

@@ -302,28 +388,25 @@ void TranscodingJobScheduler::onResourceLost() {
    validateState_l();
}

void TranscodingJobScheduler::onTopUidChanged(uid_t uid) {
    ALOGV("%s: uid %d", __FUNCTION__, uid);

    std::scoped_lock lock{mLock};

    if (uid == OFFLINE_UID) {
        ALOGW("%s: ignoring invalid uid %d", __FUNCTION__, uid);
void TranscodingJobScheduler::onTopUidsChanged(const std::unordered_set<uid_t>& uids) {
    if (uids.empty()) {
        ALOGW("%s: ignoring empty uids", __FUNCTION__);
        return;
    }
    // If this uid doesn't have any jobs, we don't care about it.
    if (mJobQueues.count(uid) == 0) {
        ALOGW("%s: ignoring uid %d without any jobs", __FUNCTION__, uid);
        return;

    std::string uidStr;
    for (auto it = uids.begin(); it != uids.end(); it++) {
        if (!uidStr.empty()) {
            uidStr += ", ";
        }
    // If this uid is already top, don't do anything.
    if (uid == *mUidSortedList.begin()) {
        ALOGW("%s: uid %d is already top", __FUNCTION__, uid);
        return;
        uidStr += std::to_string(*it);
    }

    mUidSortedList.remove(uid);
    mUidSortedList.push_front(uid);
    ALOGD("%s: topUids: size %zu, uids: %s", __FUNCTION__, uids.size(), uidStr.c_str());

    std::scoped_lock lock{mLock};

    moveUidsToTop_l(uids, true /*preserveTopUid*/);

    updateCurrentJob_l();

+247 −0
Original line number Diff line number Diff line
/*
 * Copyright (C) 2020 The Android Open Source Project
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *      http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

// #define LOG_NDEBUG 0
#define LOG_TAG "TranscodingUidPolicy"

#include <binder/ActivityManager.h>
#include <cutils/misc.h>  // FIRST_APPLICATION_UID
#include <inttypes.h>
#include <media/TranscodingUidPolicy.h>
#include <utils/Log.h>

#include <utility>

namespace android {

constexpr static uid_t OFFLINE_UID = -1;
constexpr static const char* kTranscodingTag = "transcoding";

struct TranscodingUidPolicy::UidObserver : public BnUidObserver,
                                           public virtual IBinder::DeathRecipient {
    explicit UidObserver(TranscodingUidPolicy* owner) : mOwner(owner) {}

    // IUidObserver
    void onUidGone(uid_t uid, bool disabled) override;
    void onUidActive(uid_t uid) override;
    void onUidIdle(uid_t uid, bool disabled) override;
    void onUidStateChanged(uid_t uid, int32_t procState, int64_t procStateSeq,
                           int32_t capability) override;

    // IBinder::DeathRecipient implementation
    void binderDied(const wp<IBinder>& who) override;

    TranscodingUidPolicy* mOwner;
};

void TranscodingUidPolicy::UidObserver::onUidGone(uid_t uid __unused, bool disabled __unused) {}

void TranscodingUidPolicy::UidObserver::onUidActive(uid_t uid __unused) {}

void TranscodingUidPolicy::UidObserver::onUidIdle(uid_t uid __unused, bool disabled __unused) {}

void TranscodingUidPolicy::UidObserver::onUidStateChanged(uid_t uid, int32_t procState,
                                                          int64_t procStateSeq __unused,
                                                          int32_t capability __unused) {
    mOwner->onUidStateChanged(uid, procState);
}

void TranscodingUidPolicy::UidObserver::binderDied(const wp<IBinder>& /*who*/) {
    ALOGW("TranscodingUidPolicy: ActivityManager has died");
    // TODO(chz): this is a rare event (since if the AMS is dead, the system is
    // probably dead as well). But we should try to reconnect.
    mOwner->setUidObserverRegistered(false);
}

////////////////////////////////////////////////////////////////////////////

TranscodingUidPolicy::TranscodingUidPolicy()
      : mAm(std::make_shared<ActivityManager>()),
        mUidObserver(new UidObserver(this)),
        mRegistered(false),
        mTopUidState(ActivityManager::PROCESS_STATE_UNKNOWN) {
    registerSelf();
}

TranscodingUidPolicy::~TranscodingUidPolicy() {
    unregisterSelf();
}

void TranscodingUidPolicy::registerSelf() {
    status_t res = mAm->linkToDeath(mUidObserver.get());
    mAm->registerUidObserver(
            mUidObserver.get(),
            ActivityManager::UID_OBSERVER_GONE | ActivityManager::UID_OBSERVER_IDLE |
                    ActivityManager::UID_OBSERVER_ACTIVE | ActivityManager::UID_OBSERVER_PROCSTATE,
            ActivityManager::PROCESS_STATE_UNKNOWN, String16(kTranscodingTag));

    if (res == OK) {
        Mutex::Autolock _l(mUidLock);

        mRegistered = true;
        ALOGI("TranscodingUidPolicy: Registered with ActivityManager");
    } else {
        mAm->unregisterUidObserver(mUidObserver.get());
    }
}

void TranscodingUidPolicy::unregisterSelf() {
    mAm->unregisterUidObserver(mUidObserver.get());
    mAm->unlinkToDeath(mUidObserver.get());

    Mutex::Autolock _l(mUidLock);

    mRegistered = false;

    ALOGI("TranscodingUidPolicy: Unregistered with ActivityManager");
}

void TranscodingUidPolicy::setUidObserverRegistered(bool registered) {
    Mutex::Autolock _l(mUidLock);

    mRegistered = registered;
}

void TranscodingUidPolicy::setCallback(const std::shared_ptr<UidPolicyCallbackInterface>& cb) {
    mUidPolicyCallback = cb;
}

void TranscodingUidPolicy::registerMonitorUid(uid_t uid) {
    Mutex::Autolock _l(mUidLock);
    if (uid == OFFLINE_UID) {
        ALOGW("Ignoring the offline uid");
        return;
    }
    if (mUidStateMap.find(uid) != mUidStateMap.end()) {
        ALOGE("%s: Trying to register uid: %d which is already monitored!", __FUNCTION__, uid);
        return;
    }

    int32_t state = ActivityManager::PROCESS_STATE_UNKNOWN;
    if (mRegistered && mAm->isUidActiveOrForeground(uid, String16(kTranscodingTag))) {
        state = mAm->getUidProcessState(uid, String16(kTranscodingTag));
    }

    ALOGV("%s: inserting new uid: %u, procState %d", __FUNCTION__, uid, state);

    mUidStateMap.emplace(std::pair<uid_t, int32_t>(uid, state));
    mStateUidMap[state].insert(uid);

    updateTopUid_l();
}

void TranscodingUidPolicy::unregisterMonitorUid(uid_t uid) {
    Mutex::Autolock _l(mUidLock);

    auto it = mUidStateMap.find(uid);
    if (it == mUidStateMap.end()) {
        ALOGE("%s: Trying to unregister uid: %d which is not monitored!", __FUNCTION__, uid);
        return;
    }

    auto stateIt = mStateUidMap.find(it->second);
    if (stateIt != mStateUidMap.end()) {
        stateIt->second.erase(uid);
        if (stateIt->second.empty()) {
            mStateUidMap.erase(stateIt);
        }
    }
    mUidStateMap.erase(it);

    updateTopUid_l();
}

bool TranscodingUidPolicy::isUidOnTop(uid_t uid) {
    Mutex::Autolock _l(mUidLock);

    return mTopUidState != ActivityManager::PROCESS_STATE_UNKNOWN &&
           mTopUidState == getProcState_l(uid);
}

std::unordered_set<uid_t> TranscodingUidPolicy::getTopUids() const {
    Mutex::Autolock _l(mUidLock);

    if (mTopUidState == ActivityManager::PROCESS_STATE_UNKNOWN) {
        return std::unordered_set<uid_t>();
    }

    return mStateUidMap.at(mTopUidState);
}

void TranscodingUidPolicy::onUidStateChanged(uid_t uid, int32_t procState) {
    ALOGV("onUidStateChanged: %u, procState %d", uid, procState);

    bool topUidSetChanged = false;
    std::unordered_set<uid_t> topUids;
    {
        Mutex::Autolock _l(mUidLock);
        auto it = mUidStateMap.find(uid);
        if (it != mUidStateMap.end() && it->second != procState) {
            // Top set changed if 1) the uid is in the current top uid set, or 2) the
            // new procState is at least the same priority as the current top uid state.
            bool isUidCurrentTop = mTopUidState != ActivityManager::PROCESS_STATE_UNKNOWN &&
                                   mStateUidMap[mTopUidState].count(uid) > 0;
            bool isNewStateHigherThanTop = procState != ActivityManager::PROCESS_STATE_UNKNOWN &&
                                           (procState <= mTopUidState ||
                                            mTopUidState == ActivityManager::PROCESS_STATE_UNKNOWN);
            topUidSetChanged = (isUidCurrentTop || isNewStateHigherThanTop);

            // Move uid to the new procState.
            mStateUidMap[it->second].erase(uid);
            mStateUidMap[procState].insert(uid);
            it->second = procState;

            if (topUidSetChanged) {
                updateTopUid_l();

                // Make a copy of the uid set for callback.
                topUids = mStateUidMap[mTopUidState];
            }
        }
    }

    ALOGV("topUidSetChanged: %d", topUidSetChanged);

    if (topUidSetChanged) {
        auto callback = mUidPolicyCallback.lock();
        if (callback != nullptr) {
            callback->onTopUidsChanged(topUids);
        }
    }
}

void TranscodingUidPolicy::updateTopUid_l() {
    // Update top uid state.
    mTopUidState = ActivityManager::PROCESS_STATE_UNKNOWN;
    for (auto stateIt = mStateUidMap.begin(); stateIt != mStateUidMap.end(); stateIt++) {
        if (stateIt->first != ActivityManager::PROCESS_STATE_UNKNOWN && !stateIt->second.empty()) {
            mTopUidState = stateIt->first;
            break;
        }
    }

    ALOGV("%s: top uid state is %d", __FUNCTION__, mTopUidState);
}

int32_t TranscodingUidPolicy::getProcState_l(uid_t uid) {
    auto it = mUidStateMap.find(uid);
    if (it != mUidStateMap.end()) {
        return it->second;
    }
    return ActivityManager::PROCESS_STATE_UNKNOWN;
}

}  // namespace android
+1 −0
Original line number Diff line number Diff line
@@ -43,6 +43,7 @@ public:
    // TODO(chz): determine what parameters are needed here.
    virtual void onFinish(int64_t clientId, int32_t jobId) = 0;
    virtual void onError(int64_t clientId, int32_t jobId, TranscodingErrorCode err) = 0;
    virtual void onProgressUpdate(int64_t clientId, int32_t jobId, int32_t progress) = 0;

    // Called when transcoding becomes temporarily inaccessible due to loss of resource.
    // If there is any job currently running, it will be paused. When resource contention
Loading