Loading media/libmediatranscoding/Android.bp +3 −2 Original line number Diff line number Diff line Loading @@ -21,7 +21,7 @@ aidl_interface { srcs: [ "aidl/android/media/IMediaTranscodingService.aidl", "aidl/android/media/ITranscodingClient.aidl", "aidl/android/media/ITranscodingClientListener.aidl", "aidl/android/media/ITranscodingClientCallback.aidl", "aidl/android/media/TranscodingErrorCode.aidl", "aidl/android/media/TranscodingJobPriority.aidl", "aidl/android/media/TranscodingType.aidl", Loading @@ -36,7 +36,8 @@ cc_library_shared { name: "libmediatranscoding", srcs: [ "TranscodingClientManager.cpp" "TranscodingClientManager.cpp", "TranscodingJobScheduler.cpp", ], shared_libs: [ Loading media/libmediatranscoding/TranscodingClientManager.cpp +80 −64 Original line number Diff line number Diff line Loading @@ -21,6 +21,7 @@ #include <android/binder_ibinder.h> #include <inttypes.h> #include <media/TranscodingClientManager.h> #include <media/TranscodingRequest.h> #include <utils/Log.h> namespace android { Loading @@ -37,94 +38,117 @@ using ::ndk::SpAIBinder; * ClientImpl implements a single client and contains all its information. */ struct TranscodingClientManager::ClientImpl : public BnTranscodingClient { /* The remote client listener that this ClientInfo is associated with. /* The remote client callback that this ClientInfo is associated with. * Once the ClientInfo is created, we hold an SpAIBinder so that the binder * object doesn't get created again, otherwise the binder object pointer * may not be unique. */ SpAIBinder mClientListener; SpAIBinder mClientCallback; /* A unique id assigned to the client by the service. This number is used * by the service for indexing. Here we use the binder object's pointer * (casted to int64t_t) as the client id. */ ClientIdType mClientId; int32_t mClientPid; int32_t mClientUid; pid_t mClientPid; uid_t mClientUid; std::string mClientName; std::string mClientOpPackageName; // Next jobId to assign std::atomic<std::int32_t> mNextJobId; // Pointer to the client manager for this client TranscodingClientManager* mOwner; ClientImpl(const std::shared_ptr<ITranscodingClientListener>& listener, int32_t pid, int32_t uid, const std::string& clientName, const std::string& opPackageName, ClientImpl(const std::shared_ptr<ITranscodingClientCallback>& callback, pid_t pid, uid_t uid, const std::string& clientName, const std::string& opPackageName, TranscodingClientManager* owner); Status submitRequest(const TranscodingRequestParcel& /*in_request*/, TranscodingJobParcel* /*out_job*/, int32_t* /*_aidl_return*/) override; TranscodingJobParcel* /*out_job*/, bool* /*_aidl_return*/) override; Status cancelJob(int32_t /*in_jobId*/, bool* /*_aidl_return*/) override; Status getJobWithId(int32_t /*in_jobId*/, TranscodingJobParcel* /*out_job*/, bool* /*_aidl_return*/) override; Status getJobWithId(int32_t /*in_jobId*/, TranscodingJobParcel* /*out_job*/, bool* /*_aidl_return*/) override; Status unregister() override; }; TranscodingClientManager::ClientImpl::ClientImpl( const std::shared_ptr<ITranscodingClientListener>& listener, int32_t pid, int32_t uid, const std::string& clientName, const std::string& opPackageName, const std::shared_ptr<ITranscodingClientCallback>& callback, pid_t pid, uid_t uid, const std::string& clientName, const std::string& opPackageName, TranscodingClientManager* owner) : mClientListener((listener != nullptr) ? listener->asBinder() : nullptr), mClientId((int64_t)mClientListener.get()), : mClientCallback((callback != nullptr) ? callback->asBinder() : nullptr), mClientId((int64_t)mClientCallback.get()), mClientPid(pid), mClientUid(uid), mClientName(clientName), mClientOpPackageName(opPackageName), mNextJobId(0), mOwner(owner) {} Status TranscodingClientManager::ClientImpl::submitRequest( const TranscodingRequestParcel& /*in_request*/, TranscodingJobParcel* /*out_job*/, int32_t* /*_aidl_return*/) { const TranscodingRequestParcel& in_request, TranscodingJobParcel* out_job, bool* _aidl_return) { if (in_request.fileName.empty()) { // This is the only error we check for now. *_aidl_return = false; return Status::ok(); } int32_t jobId = mNextJobId.fetch_add(1); *_aidl_return = mOwner->mJobScheduler->submit(mClientId, jobId, mClientPid, in_request, ITranscodingClientCallback::fromBinder(mClientCallback)); if (*_aidl_return) { out_job->jobId = jobId; // TODO(chz): is some of this coming from JobScheduler? *(TranscodingRequest*)&out_job->request = in_request; out_job->awaitNumberOfJobs = 0; } return Status::ok(); } Status TranscodingClientManager::ClientImpl::cancelJob( int32_t /*in_jobId*/, bool* /*_aidl_return*/) { Status TranscodingClientManager::ClientImpl::cancelJob(int32_t in_jobId, bool* _aidl_return) { *_aidl_return = mOwner->mJobScheduler->cancel(mClientId, in_jobId); return Status::ok(); } Status TranscodingClientManager::ClientImpl::getJobWithId(int32_t /*in_jobId*/, TranscodingJobParcel* /*out_job*/, bool* /*_aidl_return*/) { Status TranscodingClientManager::ClientImpl::getJobWithId(int32_t in_jobId, TranscodingJobParcel* out_job, bool* _aidl_return) { *_aidl_return = mOwner->mJobScheduler->getJob(mClientId, in_jobId, &out_job->request); if (*_aidl_return) { out_job->jobId = in_jobId; out_job->awaitNumberOfJobs = 0; } return Status::ok(); } Status TranscodingClientManager::ClientImpl::unregister() { // TODO(chz): Decide what to do about this client's jobs. // If app crashed, it could be relaunched later. Do we want to keep the // jobs around for that? mOwner->removeClient(mClientId); return Status::ok(); } /////////////////////////////////////////////////////////////////////////////// // static TranscodingClientManager& TranscodingClientManager::getInstance() { static TranscodingClientManager gInstance{}; return gInstance; } // static void TranscodingClientManager::BinderDiedCallback(void* cookie) { ClientIdType clientId = static_cast<ClientIdType>(reinterpret_cast<intptr_t>(cookie)); ALOGD("Client %lld is dead", (long long) clientId); // Don't check for pid validity since we know it's already dead. TranscodingClientManager& manager = TranscodingClientManager::getInstance(); manager.removeClient(clientId); ClientImpl* client = static_cast<ClientImpl*>(cookie); ALOGD("Client %lld is dead", (long long)client->mClientId); client->unregister(); } TranscodingClientManager::TranscodingClientManager() : mDeathRecipient(AIBinder_DeathRecipient_new(BinderDiedCallback)) { TranscodingClientManager::TranscodingClientManager( const std::shared_ptr<SchedulerClientInterface>& scheduler) : mDeathRecipient(AIBinder_DeathRecipient_new(BinderDiedCallback)), mJobScheduler(scheduler) { ALOGD("TranscodingClientManager started"); } Loading @@ -151,8 +175,8 @@ void TranscodingClientManager::dumpAllClients(int fd, const Vector<String16>& ar } for (const auto& iter : mClientIdToClientMap) { snprintf(buffer, SIZE, " -- Client id: %lld name: %s\n", (long long)iter.first, iter.second->mClientName.c_str()); snprintf(buffer, SIZE, " -- Client id: %lld name: %s\n", (long long)iter.first, iter.second->mClientName.c_str()); result.append(buffer); } Loading @@ -160,22 +184,18 @@ void TranscodingClientManager::dumpAllClients(int fd, const Vector<String16>& ar } status_t TranscodingClientManager::addClient( const std::shared_ptr<ITranscodingClientListener>& listener, int32_t pid, int32_t uid, const std::string& clientName, const std::string& opPackageName, const std::shared_ptr<ITranscodingClientCallback>& callback, pid_t pid, uid_t uid, const std::string& clientName, const std::string& opPackageName, std::shared_ptr<ITranscodingClient>* outClient) { // Validate the client. if (listener == nullptr || pid < 0 || uid < 0 || clientName.empty() || opPackageName.empty()) { if (callback == nullptr || pid < 0 || clientName.empty() || opPackageName.empty()) { ALOGE("Invalid client"); return BAD_VALUE; } // Creates the client and uses its process id as client id. std::shared_ptr<ClientImpl> client = ::ndk::SharedRefBase::make<ClientImpl>( listener, pid, uid, clientName, opPackageName, this); std::shared_ptr<ClientImpl> client = ::ndk::SharedRefBase::make<ClientImpl>( callback, pid, uid, clientName, opPackageName, this); std::scoped_lock lock{mLock}; Loading @@ -185,14 +205,11 @@ status_t TranscodingClientManager::addClient( } ALOGD("Adding client id %lld, pid %d, uid %d, name %s, package %s", (long long)client->mClientId, client->mClientPid, client->mClientUid, client->mClientName.c_str(), client->mClientOpPackageName.c_str()); (long long)client->mClientId, client->mClientPid, client->mClientUid, client->mClientName.c_str(), client->mClientOpPackageName.c_str()); AIBinder_linkToDeath(client->mClientListener.get(), mDeathRecipient.get(), reinterpret_cast<void*>(client->mClientId)); AIBinder_linkToDeath(client->mClientCallback.get(), mDeathRecipient.get(), reinterpret_cast<void*>(client.get())); // Adds the new client to the map. mClientIdToClientMap[client->mClientId] = client; Loading @@ -202,7 +219,6 @@ status_t TranscodingClientManager::addClient( return OK; } status_t TranscodingClientManager::removeClient(ClientIdType clientId) { ALOGD("Removing client id %lld", (long long)clientId); std::scoped_lock lock{mLock}; Loading @@ -214,12 +230,12 @@ status_t TranscodingClientManager::removeClient(ClientIdType clientId) { return INVALID_OPERATION; } SpAIBinder listener = it->second->mClientListener; SpAIBinder callback = it->second->mClientCallback; // Check if the client still live. If alive, unlink the death. if (listener.get() != nullptr) { AIBinder_unlinkToDeath(listener.get(), mDeathRecipient.get(), reinterpret_cast<void*>(clientId)); if (callback.get() != nullptr) { AIBinder_unlinkToDeath(callback.get(), mDeathRecipient.get(), reinterpret_cast<void*>(it->second.get())); } // Erase the entry. Loading media/libmediatranscoding/TranscodingJobScheduler.cpp 0 → 100644 +369 −0 Original line number Diff line number Diff line /* * Copyright (C) 2020 The Android Open Source Project * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ // #define LOG_NDEBUG 0 #define LOG_TAG "TranscodingJobScheduler" #define VALIDATE_STATE 1 #include <inttypes.h> #include <media/TranscodingJobScheduler.h> #include <utils/Log.h> #include <utility> namespace android { constexpr static pid_t OFFLINE_PID = -1; //static String8 TranscodingJobScheduler::jobToString(const JobKeyType& jobKey) { return String8::format("{client:%lld, job:%d}", (long long)jobKey.first, jobKey.second); } TranscodingJobScheduler::TranscodingJobScheduler( const std::shared_ptr<TranscoderInterface>& transcoder, const std::shared_ptr<ProcessInfoInterface>& procInfo) : mTranscoder(transcoder), mProcInfo(procInfo), mCurrentJob(nullptr), mResourceLost(false) { // Only push empty offline queue initially. Realtime queues are added when requests come in. mPidSortedList.push_back(OFFLINE_PID); mOfflinePidIterator = mPidSortedList.begin(); mJobQueues.emplace(OFFLINE_PID, JobQueueType()); } TranscodingJobScheduler::~TranscodingJobScheduler() {} TranscodingJobScheduler::Job* TranscodingJobScheduler::getTopJob_l() { if (mJobMap.empty()) { return nullptr; } pid_t topPid = *mPidSortedList.begin(); JobKeyType topJobKey = *mJobQueues[topPid].begin(); return &mJobMap[topJobKey]; } void TranscodingJobScheduler::updateCurrentJob_l() { Job* topJob = getTopJob_l(); Job* curJob = mCurrentJob; ALOGV("updateCurrentJob: topJob is %s, curJob is %s", topJob == nullptr ? "null" : jobToString(topJob->key).c_str(), curJob == nullptr ? "null" : jobToString(curJob->key).c_str()); // If we found a topJob that should be run, and it's not already running, // take some actions to ensure it's running. if (topJob != nullptr && (topJob != curJob || topJob->state != Job::RUNNING)) { // If another job is currently running, pause it first. if (curJob != nullptr && curJob->state == Job::RUNNING) { mTranscoder->pause(curJob->key.first, curJob->key.second); curJob->state = Job::PAUSED; } // If we are not experiencing resource loss, we can start or resume // the topJob now. if (!mResourceLost) { if (topJob->state == Job::NOT_STARTED) { mTranscoder->start(topJob->key.first, topJob->key.second); } else if (topJob->state == Job::PAUSED) { mTranscoder->resume(topJob->key.first, topJob->key.second); } topJob->state = Job::RUNNING; } } mCurrentJob = topJob; } void TranscodingJobScheduler::removeJob_l(const JobKeyType& jobKey) { ALOGV("%s: job %s", __FUNCTION__, jobToString(jobKey).c_str()); if (mJobMap.count(jobKey) == 0) { ALOGE("job %s doesn't exist", jobToString(jobKey).c_str()); return; } // Remove job from pid's queue. const pid_t pid = mJobMap[jobKey].pid; JobQueueType& jobQueue = mJobQueues[pid]; auto it = std::find(jobQueue.begin(), jobQueue.end(), jobKey); if (it == jobQueue.end()) { ALOGE("couldn't find job %s in queue for pid %d", jobToString(jobKey).c_str(), pid); return; } jobQueue.erase(it); // If this is the last job in a real-time queue, remove this pid's queue. if (pid != OFFLINE_PID && jobQueue.empty()) { mPidSortedList.remove(pid); mJobQueues.erase(pid); } // Clear current job. if (mCurrentJob == &mJobMap[jobKey]) { mCurrentJob = nullptr; } // Remove job from job map. mJobMap.erase(jobKey); } bool TranscodingJobScheduler::submit(ClientIdType clientId, int32_t jobId, pid_t pid, const TranscodingRequestParcel& request, const std::weak_ptr<ITranscodingClientCallback>& callback) { JobKeyType jobKey = std::make_pair(clientId, jobId); ALOGV("%s: job %s, pid %d, prioirty %d", __FUNCTION__, jobToString(jobKey).c_str(), pid, (int32_t)request.priority); std::scoped_lock lock{mLock}; if (mJobMap.count(jobKey) > 0) { ALOGE("job %s already exists", jobToString(jobKey).c_str()); return false; } // TODO(chz): only support offline vs real-time for now. All kUnspecified jobs // go to offline queue. if (request.priority == TranscodingJobPriority::kUnspecified) { pid = OFFLINE_PID; } // Add job to job map. mJobMap[jobKey].key = jobKey; mJobMap[jobKey].pid = pid; mJobMap[jobKey].state = Job::NOT_STARTED; mJobMap[jobKey].request = request; mJobMap[jobKey].callback = callback; // If it's an offline job, the queue was already added in constructor. // If it's a real-time jobs, check if a queue is already present for the pid, // and add a new queue if needed. if (pid != OFFLINE_PID) { if (mJobQueues.count(pid) == 0) { if (mProcInfo->isProcessOnTop(pid)) { mPidSortedList.push_front(pid); } else { // Shouldn't be submitting real-time requests from non-top app, // put it in front of the offline queue. mPidSortedList.insert(mOfflinePidIterator, pid); } } else if (pid != *mPidSortedList.begin()) { if (mProcInfo->isProcessOnTop(pid)) { mPidSortedList.remove(pid); mPidSortedList.push_front(pid); } } } // Append this job to the pid's queue. mJobQueues[pid].push_back(jobKey); updateCurrentJob_l(); validateState_l(); return true; } bool TranscodingJobScheduler::cancel(ClientIdType clientId, int32_t jobId) { JobKeyType jobKey = std::make_pair(clientId, jobId); ALOGV("%s: job %s", __FUNCTION__, jobToString(jobKey).c_str()); std::scoped_lock lock{mLock}; if (mJobMap.count(jobKey) == 0) { ALOGE("job %s doesn't exist", jobToString(jobKey).c_str()); return false; } // If the job is running, pause it first. if (mJobMap[jobKey].state == Job::RUNNING) { mTranscoder->pause(clientId, jobId); } // Remove the job. removeJob_l(jobKey); // Start next job. updateCurrentJob_l(); validateState_l(); return true; } bool TranscodingJobScheduler::getJob(ClientIdType clientId, int32_t jobId, TranscodingRequestParcel* request) { JobKeyType jobKey = std::make_pair(clientId, jobId); std::scoped_lock lock{mLock}; if (mJobMap.count(jobKey) == 0) { ALOGE("job %s doesn't exist", jobToString(jobKey).c_str()); return false; } *(TranscodingRequest*)request = mJobMap[jobKey].request; return true; } void TranscodingJobScheduler::onFinish(ClientIdType clientId, int32_t jobId) { JobKeyType jobKey = std::make_pair(clientId, jobId); ALOGV("%s: job %s", __FUNCTION__, jobToString(jobKey).c_str()); std::scoped_lock lock{mLock}; if (mJobMap.count(jobKey) == 0) { ALOGW("ignoring abort for non-existent job"); return; } // Only ignore if job was never started. In particular, propagate the status // to client if the job is paused. Transcoder could have posted finish when // we're pausing it, and the finish arrived after we changed current job. if (mJobMap[jobKey].state == Job::NOT_STARTED) { ALOGW("ignoring abort for job that was never started"); return; } { auto clientCallback = mJobMap[jobKey].callback.lock(); if (clientCallback != nullptr) { clientCallback->onTranscodingFinished(jobId, TranscodingResultParcel({jobId, 0})); } } // Remove the job. removeJob_l(jobKey); // Start next job. updateCurrentJob_l(); validateState_l(); } void TranscodingJobScheduler::onError(int64_t clientId, int32_t jobId, TranscodingErrorCode err) { JobKeyType jobKey = std::make_pair(clientId, jobId); ALOGV("%s: job %s, err %d", __FUNCTION__, jobToString(jobKey).c_str(), (int32_t)err); std::scoped_lock lock{mLock}; if (mJobMap.count(jobKey) == 0) { ALOGW("ignoring abort for non-existent job"); return; } // Only ignore if job was never started. In particular, propagate the status // to client if the job is paused. Transcoder could have posted finish when // we're pausing it, and the finish arrived after we changed current job. if (mJobMap[jobKey].state == Job::NOT_STARTED) { ALOGW("ignoring abort for job that was never started"); return; } { auto clientCallback = mJobMap[jobKey].callback.lock(); if (clientCallback != nullptr) { clientCallback->onTranscodingFailed(jobId, err); } } // Remove the job. removeJob_l(jobKey); // Start next job. updateCurrentJob_l(); validateState_l(); } void TranscodingJobScheduler::onResourceLost() { ALOGV("%s", __FUNCTION__); std::scoped_lock lock{mLock}; // If we receive a resource loss event, the TranscoderLibrary already paused // the transcoding, so we don't need to call onPaused to notify it to pause. // Only need to update the job state here. if (mCurrentJob != nullptr && mCurrentJob->state == Job::RUNNING) { mCurrentJob->state = Job::PAUSED; } mResourceLost = true; validateState_l(); } void TranscodingJobScheduler::onTopProcessChanged(pid_t pid) { ALOGV("%s: pid %d", __FUNCTION__, pid); std::scoped_lock lock{mLock}; if (pid < 0) { ALOGW("bringProcessToTop: ignoring invalid pid %d", pid); return; } // If this pid doesn't have any jobs, we don't care about it. if (mJobQueues.count(pid) == 0) { ALOGW("bringProcessToTop: ignoring pid %d without any jobs", pid); return; } // If this pid is already top, don't do anything. if (pid == *mPidSortedList.begin()) { ALOGW("pid %d is already top", pid); return; } mPidSortedList.remove(pid); mPidSortedList.push_front(pid); updateCurrentJob_l(); validateState_l(); } void TranscodingJobScheduler::onResourceAvailable() { ALOGV("%s", __FUNCTION__); std::scoped_lock lock{mLock}; mResourceLost = false; updateCurrentJob_l(); validateState_l(); } void TranscodingJobScheduler::validateState_l() { #ifdef VALIDATE_STATE LOG_ALWAYS_FATAL_IF(mJobQueues.count(OFFLINE_PID) != 1, "mJobQueues offline queue number is not 1"); LOG_ALWAYS_FATAL_IF(*mOfflinePidIterator != OFFLINE_PID, "mOfflinePidIterator not pointing to offline pid"); LOG_ALWAYS_FATAL_IF(mPidSortedList.size() != mJobQueues.size(), "mPidList and mJobQueues size mismatch"); int32_t totalJobs = 0; for (auto pidIt = mPidSortedList.begin(); pidIt != mPidSortedList.end(); pidIt++) { LOG_ALWAYS_FATAL_IF(mJobQueues.count(*pidIt) != 1, "mJobQueues count for pid %d is not 1", *pidIt); for (auto jobIt = mJobQueues[*pidIt].begin(); jobIt != mJobQueues[*pidIt].end(); jobIt++) { LOG_ALWAYS_FATAL_IF(mJobMap.count(*jobIt) != 1, "mJobs count for job %s is not 1", jobToString(*jobIt).c_str()); } totalJobs += mJobQueues[*pidIt].size(); } LOG_ALWAYS_FATAL_IF(mJobMap.size() != totalJobs, "mJobs size doesn't match total jobs counted from pid queues"); #endif // VALIDATE_STATE } } // namespace android media/libmediatranscoding/aidl/android/media/IMediaTranscodingService.aidl +3 −3 Original line number Diff line number Diff line Loading @@ -17,7 +17,7 @@ package android.media; import android.media.ITranscodingClient; import android.media.ITranscodingClientListener; import android.media.ITranscodingClientCallback; import android.media.TranscodingJobParcel; import android.media.TranscodingRequestParcel; Loading Loading @@ -54,7 +54,7 @@ interface IMediaTranscodingService { * ITranscodingClient interface object. The client should save and use it * for all future transactions with the service. * * @param listener client interface for the MediaTranscodingService to call * @param callback client interface for the MediaTranscodingService to call * the client. * @param clientName name of the client. * @param opPackageName op package name of the client. Loading @@ -64,7 +64,7 @@ interface IMediaTranscodingService { * failure to register. */ ITranscodingClient registerClient( in ITranscodingClientListener listener, in ITranscodingClientCallback callback, in String clientName, in String opPackageName, in int clientUid, Loading media/libmediatranscoding/aidl/android/media/ITranscodingClient.aidl +3 −4 Original line number Diff line number Diff line Loading @@ -32,10 +32,9 @@ interface ITranscodingClient { * * @param request a TranscodingRequest contains transcoding configuration. * @param job(output variable) a TranscodingJob generated by the MediaTranscodingService. * @return a unique positive jobId for the client generated by the * MediaTranscodingService, -1 means failure. * @return true if success, false otherwise. */ int submitRequest(in TranscodingRequestParcel request, boolean submitRequest(in TranscodingRequestParcel request, out TranscodingJobParcel job); /** Loading Loading
media/libmediatranscoding/Android.bp +3 −2 Original line number Diff line number Diff line Loading @@ -21,7 +21,7 @@ aidl_interface { srcs: [ "aidl/android/media/IMediaTranscodingService.aidl", "aidl/android/media/ITranscodingClient.aidl", "aidl/android/media/ITranscodingClientListener.aidl", "aidl/android/media/ITranscodingClientCallback.aidl", "aidl/android/media/TranscodingErrorCode.aidl", "aidl/android/media/TranscodingJobPriority.aidl", "aidl/android/media/TranscodingType.aidl", Loading @@ -36,7 +36,8 @@ cc_library_shared { name: "libmediatranscoding", srcs: [ "TranscodingClientManager.cpp" "TranscodingClientManager.cpp", "TranscodingJobScheduler.cpp", ], shared_libs: [ Loading
media/libmediatranscoding/TranscodingClientManager.cpp +80 −64 Original line number Diff line number Diff line Loading @@ -21,6 +21,7 @@ #include <android/binder_ibinder.h> #include <inttypes.h> #include <media/TranscodingClientManager.h> #include <media/TranscodingRequest.h> #include <utils/Log.h> namespace android { Loading @@ -37,94 +38,117 @@ using ::ndk::SpAIBinder; * ClientImpl implements a single client and contains all its information. */ struct TranscodingClientManager::ClientImpl : public BnTranscodingClient { /* The remote client listener that this ClientInfo is associated with. /* The remote client callback that this ClientInfo is associated with. * Once the ClientInfo is created, we hold an SpAIBinder so that the binder * object doesn't get created again, otherwise the binder object pointer * may not be unique. */ SpAIBinder mClientListener; SpAIBinder mClientCallback; /* A unique id assigned to the client by the service. This number is used * by the service for indexing. Here we use the binder object's pointer * (casted to int64t_t) as the client id. */ ClientIdType mClientId; int32_t mClientPid; int32_t mClientUid; pid_t mClientPid; uid_t mClientUid; std::string mClientName; std::string mClientOpPackageName; // Next jobId to assign std::atomic<std::int32_t> mNextJobId; // Pointer to the client manager for this client TranscodingClientManager* mOwner; ClientImpl(const std::shared_ptr<ITranscodingClientListener>& listener, int32_t pid, int32_t uid, const std::string& clientName, const std::string& opPackageName, ClientImpl(const std::shared_ptr<ITranscodingClientCallback>& callback, pid_t pid, uid_t uid, const std::string& clientName, const std::string& opPackageName, TranscodingClientManager* owner); Status submitRequest(const TranscodingRequestParcel& /*in_request*/, TranscodingJobParcel* /*out_job*/, int32_t* /*_aidl_return*/) override; TranscodingJobParcel* /*out_job*/, bool* /*_aidl_return*/) override; Status cancelJob(int32_t /*in_jobId*/, bool* /*_aidl_return*/) override; Status getJobWithId(int32_t /*in_jobId*/, TranscodingJobParcel* /*out_job*/, bool* /*_aidl_return*/) override; Status getJobWithId(int32_t /*in_jobId*/, TranscodingJobParcel* /*out_job*/, bool* /*_aidl_return*/) override; Status unregister() override; }; TranscodingClientManager::ClientImpl::ClientImpl( const std::shared_ptr<ITranscodingClientListener>& listener, int32_t pid, int32_t uid, const std::string& clientName, const std::string& opPackageName, const std::shared_ptr<ITranscodingClientCallback>& callback, pid_t pid, uid_t uid, const std::string& clientName, const std::string& opPackageName, TranscodingClientManager* owner) : mClientListener((listener != nullptr) ? listener->asBinder() : nullptr), mClientId((int64_t)mClientListener.get()), : mClientCallback((callback != nullptr) ? callback->asBinder() : nullptr), mClientId((int64_t)mClientCallback.get()), mClientPid(pid), mClientUid(uid), mClientName(clientName), mClientOpPackageName(opPackageName), mNextJobId(0), mOwner(owner) {} Status TranscodingClientManager::ClientImpl::submitRequest( const TranscodingRequestParcel& /*in_request*/, TranscodingJobParcel* /*out_job*/, int32_t* /*_aidl_return*/) { const TranscodingRequestParcel& in_request, TranscodingJobParcel* out_job, bool* _aidl_return) { if (in_request.fileName.empty()) { // This is the only error we check for now. *_aidl_return = false; return Status::ok(); } int32_t jobId = mNextJobId.fetch_add(1); *_aidl_return = mOwner->mJobScheduler->submit(mClientId, jobId, mClientPid, in_request, ITranscodingClientCallback::fromBinder(mClientCallback)); if (*_aidl_return) { out_job->jobId = jobId; // TODO(chz): is some of this coming from JobScheduler? *(TranscodingRequest*)&out_job->request = in_request; out_job->awaitNumberOfJobs = 0; } return Status::ok(); } Status TranscodingClientManager::ClientImpl::cancelJob( int32_t /*in_jobId*/, bool* /*_aidl_return*/) { Status TranscodingClientManager::ClientImpl::cancelJob(int32_t in_jobId, bool* _aidl_return) { *_aidl_return = mOwner->mJobScheduler->cancel(mClientId, in_jobId); return Status::ok(); } Status TranscodingClientManager::ClientImpl::getJobWithId(int32_t /*in_jobId*/, TranscodingJobParcel* /*out_job*/, bool* /*_aidl_return*/) { Status TranscodingClientManager::ClientImpl::getJobWithId(int32_t in_jobId, TranscodingJobParcel* out_job, bool* _aidl_return) { *_aidl_return = mOwner->mJobScheduler->getJob(mClientId, in_jobId, &out_job->request); if (*_aidl_return) { out_job->jobId = in_jobId; out_job->awaitNumberOfJobs = 0; } return Status::ok(); } Status TranscodingClientManager::ClientImpl::unregister() { // TODO(chz): Decide what to do about this client's jobs. // If app crashed, it could be relaunched later. Do we want to keep the // jobs around for that? mOwner->removeClient(mClientId); return Status::ok(); } /////////////////////////////////////////////////////////////////////////////// // static TranscodingClientManager& TranscodingClientManager::getInstance() { static TranscodingClientManager gInstance{}; return gInstance; } // static void TranscodingClientManager::BinderDiedCallback(void* cookie) { ClientIdType clientId = static_cast<ClientIdType>(reinterpret_cast<intptr_t>(cookie)); ALOGD("Client %lld is dead", (long long) clientId); // Don't check for pid validity since we know it's already dead. TranscodingClientManager& manager = TranscodingClientManager::getInstance(); manager.removeClient(clientId); ClientImpl* client = static_cast<ClientImpl*>(cookie); ALOGD("Client %lld is dead", (long long)client->mClientId); client->unregister(); } TranscodingClientManager::TranscodingClientManager() : mDeathRecipient(AIBinder_DeathRecipient_new(BinderDiedCallback)) { TranscodingClientManager::TranscodingClientManager( const std::shared_ptr<SchedulerClientInterface>& scheduler) : mDeathRecipient(AIBinder_DeathRecipient_new(BinderDiedCallback)), mJobScheduler(scheduler) { ALOGD("TranscodingClientManager started"); } Loading @@ -151,8 +175,8 @@ void TranscodingClientManager::dumpAllClients(int fd, const Vector<String16>& ar } for (const auto& iter : mClientIdToClientMap) { snprintf(buffer, SIZE, " -- Client id: %lld name: %s\n", (long long)iter.first, iter.second->mClientName.c_str()); snprintf(buffer, SIZE, " -- Client id: %lld name: %s\n", (long long)iter.first, iter.second->mClientName.c_str()); result.append(buffer); } Loading @@ -160,22 +184,18 @@ void TranscodingClientManager::dumpAllClients(int fd, const Vector<String16>& ar } status_t TranscodingClientManager::addClient( const std::shared_ptr<ITranscodingClientListener>& listener, int32_t pid, int32_t uid, const std::string& clientName, const std::string& opPackageName, const std::shared_ptr<ITranscodingClientCallback>& callback, pid_t pid, uid_t uid, const std::string& clientName, const std::string& opPackageName, std::shared_ptr<ITranscodingClient>* outClient) { // Validate the client. if (listener == nullptr || pid < 0 || uid < 0 || clientName.empty() || opPackageName.empty()) { if (callback == nullptr || pid < 0 || clientName.empty() || opPackageName.empty()) { ALOGE("Invalid client"); return BAD_VALUE; } // Creates the client and uses its process id as client id. std::shared_ptr<ClientImpl> client = ::ndk::SharedRefBase::make<ClientImpl>( listener, pid, uid, clientName, opPackageName, this); std::shared_ptr<ClientImpl> client = ::ndk::SharedRefBase::make<ClientImpl>( callback, pid, uid, clientName, opPackageName, this); std::scoped_lock lock{mLock}; Loading @@ -185,14 +205,11 @@ status_t TranscodingClientManager::addClient( } ALOGD("Adding client id %lld, pid %d, uid %d, name %s, package %s", (long long)client->mClientId, client->mClientPid, client->mClientUid, client->mClientName.c_str(), client->mClientOpPackageName.c_str()); (long long)client->mClientId, client->mClientPid, client->mClientUid, client->mClientName.c_str(), client->mClientOpPackageName.c_str()); AIBinder_linkToDeath(client->mClientListener.get(), mDeathRecipient.get(), reinterpret_cast<void*>(client->mClientId)); AIBinder_linkToDeath(client->mClientCallback.get(), mDeathRecipient.get(), reinterpret_cast<void*>(client.get())); // Adds the new client to the map. mClientIdToClientMap[client->mClientId] = client; Loading @@ -202,7 +219,6 @@ status_t TranscodingClientManager::addClient( return OK; } status_t TranscodingClientManager::removeClient(ClientIdType clientId) { ALOGD("Removing client id %lld", (long long)clientId); std::scoped_lock lock{mLock}; Loading @@ -214,12 +230,12 @@ status_t TranscodingClientManager::removeClient(ClientIdType clientId) { return INVALID_OPERATION; } SpAIBinder listener = it->second->mClientListener; SpAIBinder callback = it->second->mClientCallback; // Check if the client still live. If alive, unlink the death. if (listener.get() != nullptr) { AIBinder_unlinkToDeath(listener.get(), mDeathRecipient.get(), reinterpret_cast<void*>(clientId)); if (callback.get() != nullptr) { AIBinder_unlinkToDeath(callback.get(), mDeathRecipient.get(), reinterpret_cast<void*>(it->second.get())); } // Erase the entry. Loading
media/libmediatranscoding/TranscodingJobScheduler.cpp 0 → 100644 +369 −0 Original line number Diff line number Diff line /* * Copyright (C) 2020 The Android Open Source Project * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ // #define LOG_NDEBUG 0 #define LOG_TAG "TranscodingJobScheduler" #define VALIDATE_STATE 1 #include <inttypes.h> #include <media/TranscodingJobScheduler.h> #include <utils/Log.h> #include <utility> namespace android { constexpr static pid_t OFFLINE_PID = -1; //static String8 TranscodingJobScheduler::jobToString(const JobKeyType& jobKey) { return String8::format("{client:%lld, job:%d}", (long long)jobKey.first, jobKey.second); } TranscodingJobScheduler::TranscodingJobScheduler( const std::shared_ptr<TranscoderInterface>& transcoder, const std::shared_ptr<ProcessInfoInterface>& procInfo) : mTranscoder(transcoder), mProcInfo(procInfo), mCurrentJob(nullptr), mResourceLost(false) { // Only push empty offline queue initially. Realtime queues are added when requests come in. mPidSortedList.push_back(OFFLINE_PID); mOfflinePidIterator = mPidSortedList.begin(); mJobQueues.emplace(OFFLINE_PID, JobQueueType()); } TranscodingJobScheduler::~TranscodingJobScheduler() {} TranscodingJobScheduler::Job* TranscodingJobScheduler::getTopJob_l() { if (mJobMap.empty()) { return nullptr; } pid_t topPid = *mPidSortedList.begin(); JobKeyType topJobKey = *mJobQueues[topPid].begin(); return &mJobMap[topJobKey]; } void TranscodingJobScheduler::updateCurrentJob_l() { Job* topJob = getTopJob_l(); Job* curJob = mCurrentJob; ALOGV("updateCurrentJob: topJob is %s, curJob is %s", topJob == nullptr ? "null" : jobToString(topJob->key).c_str(), curJob == nullptr ? "null" : jobToString(curJob->key).c_str()); // If we found a topJob that should be run, and it's not already running, // take some actions to ensure it's running. if (topJob != nullptr && (topJob != curJob || topJob->state != Job::RUNNING)) { // If another job is currently running, pause it first. if (curJob != nullptr && curJob->state == Job::RUNNING) { mTranscoder->pause(curJob->key.first, curJob->key.second); curJob->state = Job::PAUSED; } // If we are not experiencing resource loss, we can start or resume // the topJob now. if (!mResourceLost) { if (topJob->state == Job::NOT_STARTED) { mTranscoder->start(topJob->key.first, topJob->key.second); } else if (topJob->state == Job::PAUSED) { mTranscoder->resume(topJob->key.first, topJob->key.second); } topJob->state = Job::RUNNING; } } mCurrentJob = topJob; } void TranscodingJobScheduler::removeJob_l(const JobKeyType& jobKey) { ALOGV("%s: job %s", __FUNCTION__, jobToString(jobKey).c_str()); if (mJobMap.count(jobKey) == 0) { ALOGE("job %s doesn't exist", jobToString(jobKey).c_str()); return; } // Remove job from pid's queue. const pid_t pid = mJobMap[jobKey].pid; JobQueueType& jobQueue = mJobQueues[pid]; auto it = std::find(jobQueue.begin(), jobQueue.end(), jobKey); if (it == jobQueue.end()) { ALOGE("couldn't find job %s in queue for pid %d", jobToString(jobKey).c_str(), pid); return; } jobQueue.erase(it); // If this is the last job in a real-time queue, remove this pid's queue. if (pid != OFFLINE_PID && jobQueue.empty()) { mPidSortedList.remove(pid); mJobQueues.erase(pid); } // Clear current job. if (mCurrentJob == &mJobMap[jobKey]) { mCurrentJob = nullptr; } // Remove job from job map. mJobMap.erase(jobKey); } bool TranscodingJobScheduler::submit(ClientIdType clientId, int32_t jobId, pid_t pid, const TranscodingRequestParcel& request, const std::weak_ptr<ITranscodingClientCallback>& callback) { JobKeyType jobKey = std::make_pair(clientId, jobId); ALOGV("%s: job %s, pid %d, prioirty %d", __FUNCTION__, jobToString(jobKey).c_str(), pid, (int32_t)request.priority); std::scoped_lock lock{mLock}; if (mJobMap.count(jobKey) > 0) { ALOGE("job %s already exists", jobToString(jobKey).c_str()); return false; } // TODO(chz): only support offline vs real-time for now. All kUnspecified jobs // go to offline queue. if (request.priority == TranscodingJobPriority::kUnspecified) { pid = OFFLINE_PID; } // Add job to job map. mJobMap[jobKey].key = jobKey; mJobMap[jobKey].pid = pid; mJobMap[jobKey].state = Job::NOT_STARTED; mJobMap[jobKey].request = request; mJobMap[jobKey].callback = callback; // If it's an offline job, the queue was already added in constructor. // If it's a real-time jobs, check if a queue is already present for the pid, // and add a new queue if needed. if (pid != OFFLINE_PID) { if (mJobQueues.count(pid) == 0) { if (mProcInfo->isProcessOnTop(pid)) { mPidSortedList.push_front(pid); } else { // Shouldn't be submitting real-time requests from non-top app, // put it in front of the offline queue. mPidSortedList.insert(mOfflinePidIterator, pid); } } else if (pid != *mPidSortedList.begin()) { if (mProcInfo->isProcessOnTop(pid)) { mPidSortedList.remove(pid); mPidSortedList.push_front(pid); } } } // Append this job to the pid's queue. mJobQueues[pid].push_back(jobKey); updateCurrentJob_l(); validateState_l(); return true; } bool TranscodingJobScheduler::cancel(ClientIdType clientId, int32_t jobId) { JobKeyType jobKey = std::make_pair(clientId, jobId); ALOGV("%s: job %s", __FUNCTION__, jobToString(jobKey).c_str()); std::scoped_lock lock{mLock}; if (mJobMap.count(jobKey) == 0) { ALOGE("job %s doesn't exist", jobToString(jobKey).c_str()); return false; } // If the job is running, pause it first. if (mJobMap[jobKey].state == Job::RUNNING) { mTranscoder->pause(clientId, jobId); } // Remove the job. removeJob_l(jobKey); // Start next job. updateCurrentJob_l(); validateState_l(); return true; } bool TranscodingJobScheduler::getJob(ClientIdType clientId, int32_t jobId, TranscodingRequestParcel* request) { JobKeyType jobKey = std::make_pair(clientId, jobId); std::scoped_lock lock{mLock}; if (mJobMap.count(jobKey) == 0) { ALOGE("job %s doesn't exist", jobToString(jobKey).c_str()); return false; } *(TranscodingRequest*)request = mJobMap[jobKey].request; return true; } void TranscodingJobScheduler::onFinish(ClientIdType clientId, int32_t jobId) { JobKeyType jobKey = std::make_pair(clientId, jobId); ALOGV("%s: job %s", __FUNCTION__, jobToString(jobKey).c_str()); std::scoped_lock lock{mLock}; if (mJobMap.count(jobKey) == 0) { ALOGW("ignoring abort for non-existent job"); return; } // Only ignore if job was never started. In particular, propagate the status // to client if the job is paused. Transcoder could have posted finish when // we're pausing it, and the finish arrived after we changed current job. if (mJobMap[jobKey].state == Job::NOT_STARTED) { ALOGW("ignoring abort for job that was never started"); return; } { auto clientCallback = mJobMap[jobKey].callback.lock(); if (clientCallback != nullptr) { clientCallback->onTranscodingFinished(jobId, TranscodingResultParcel({jobId, 0})); } } // Remove the job. removeJob_l(jobKey); // Start next job. updateCurrentJob_l(); validateState_l(); } void TranscodingJobScheduler::onError(int64_t clientId, int32_t jobId, TranscodingErrorCode err) { JobKeyType jobKey = std::make_pair(clientId, jobId); ALOGV("%s: job %s, err %d", __FUNCTION__, jobToString(jobKey).c_str(), (int32_t)err); std::scoped_lock lock{mLock}; if (mJobMap.count(jobKey) == 0) { ALOGW("ignoring abort for non-existent job"); return; } // Only ignore if job was never started. In particular, propagate the status // to client if the job is paused. Transcoder could have posted finish when // we're pausing it, and the finish arrived after we changed current job. if (mJobMap[jobKey].state == Job::NOT_STARTED) { ALOGW("ignoring abort for job that was never started"); return; } { auto clientCallback = mJobMap[jobKey].callback.lock(); if (clientCallback != nullptr) { clientCallback->onTranscodingFailed(jobId, err); } } // Remove the job. removeJob_l(jobKey); // Start next job. updateCurrentJob_l(); validateState_l(); } void TranscodingJobScheduler::onResourceLost() { ALOGV("%s", __FUNCTION__); std::scoped_lock lock{mLock}; // If we receive a resource loss event, the TranscoderLibrary already paused // the transcoding, so we don't need to call onPaused to notify it to pause. // Only need to update the job state here. if (mCurrentJob != nullptr && mCurrentJob->state == Job::RUNNING) { mCurrentJob->state = Job::PAUSED; } mResourceLost = true; validateState_l(); } void TranscodingJobScheduler::onTopProcessChanged(pid_t pid) { ALOGV("%s: pid %d", __FUNCTION__, pid); std::scoped_lock lock{mLock}; if (pid < 0) { ALOGW("bringProcessToTop: ignoring invalid pid %d", pid); return; } // If this pid doesn't have any jobs, we don't care about it. if (mJobQueues.count(pid) == 0) { ALOGW("bringProcessToTop: ignoring pid %d without any jobs", pid); return; } // If this pid is already top, don't do anything. if (pid == *mPidSortedList.begin()) { ALOGW("pid %d is already top", pid); return; } mPidSortedList.remove(pid); mPidSortedList.push_front(pid); updateCurrentJob_l(); validateState_l(); } void TranscodingJobScheduler::onResourceAvailable() { ALOGV("%s", __FUNCTION__); std::scoped_lock lock{mLock}; mResourceLost = false; updateCurrentJob_l(); validateState_l(); } void TranscodingJobScheduler::validateState_l() { #ifdef VALIDATE_STATE LOG_ALWAYS_FATAL_IF(mJobQueues.count(OFFLINE_PID) != 1, "mJobQueues offline queue number is not 1"); LOG_ALWAYS_FATAL_IF(*mOfflinePidIterator != OFFLINE_PID, "mOfflinePidIterator not pointing to offline pid"); LOG_ALWAYS_FATAL_IF(mPidSortedList.size() != mJobQueues.size(), "mPidList and mJobQueues size mismatch"); int32_t totalJobs = 0; for (auto pidIt = mPidSortedList.begin(); pidIt != mPidSortedList.end(); pidIt++) { LOG_ALWAYS_FATAL_IF(mJobQueues.count(*pidIt) != 1, "mJobQueues count for pid %d is not 1", *pidIt); for (auto jobIt = mJobQueues[*pidIt].begin(); jobIt != mJobQueues[*pidIt].end(); jobIt++) { LOG_ALWAYS_FATAL_IF(mJobMap.count(*jobIt) != 1, "mJobs count for job %s is not 1", jobToString(*jobIt).c_str()); } totalJobs += mJobQueues[*pidIt].size(); } LOG_ALWAYS_FATAL_IF(mJobMap.size() != totalJobs, "mJobs size doesn't match total jobs counted from pid queues"); #endif // VALIDATE_STATE } } // namespace android
media/libmediatranscoding/aidl/android/media/IMediaTranscodingService.aidl +3 −3 Original line number Diff line number Diff line Loading @@ -17,7 +17,7 @@ package android.media; import android.media.ITranscodingClient; import android.media.ITranscodingClientListener; import android.media.ITranscodingClientCallback; import android.media.TranscodingJobParcel; import android.media.TranscodingRequestParcel; Loading Loading @@ -54,7 +54,7 @@ interface IMediaTranscodingService { * ITranscodingClient interface object. The client should save and use it * for all future transactions with the service. * * @param listener client interface for the MediaTranscodingService to call * @param callback client interface for the MediaTranscodingService to call * the client. * @param clientName name of the client. * @param opPackageName op package name of the client. Loading @@ -64,7 +64,7 @@ interface IMediaTranscodingService { * failure to register. */ ITranscodingClient registerClient( in ITranscodingClientListener listener, in ITranscodingClientCallback callback, in String clientName, in String opPackageName, in int clientUid, Loading
media/libmediatranscoding/aidl/android/media/ITranscodingClient.aidl +3 −4 Original line number Diff line number Diff line Loading @@ -32,10 +32,9 @@ interface ITranscodingClient { * * @param request a TranscodingRequest contains transcoding configuration. * @param job(output variable) a TranscodingJob generated by the MediaTranscodingService. * @return a unique positive jobId for the client generated by the * MediaTranscodingService, -1 means failure. * @return true if success, false otherwise. */ int submitRequest(in TranscodingRequestParcel request, boolean submitRequest(in TranscodingRequestParcel request, out TranscodingJobParcel job); /** Loading