Loading media/libmediatranscoding/Android.bp +5 −0 Original line number Original line Diff line number Diff line Loading @@ -54,6 +54,7 @@ cc_library_shared { srcs: [ srcs: [ "TranscodingClientManager.cpp", "TranscodingClientManager.cpp", "TranscodingJobScheduler.cpp", "TranscodingJobScheduler.cpp", "TranscodingResourcePolicy.cpp", "TranscodingUidPolicy.cpp", "TranscodingUidPolicy.cpp", "TranscoderWrapper.cpp", "TranscoderWrapper.cpp", ], ], Loading @@ -67,12 +68,16 @@ cc_library_shared { "libbinder", "libbinder", "libmediandk", "libmediandk", ], ], export_shared_lib_headers: [ "libmediandk", ], export_include_dirs: ["include"], export_include_dirs: ["include"], static_libs: [ static_libs: [ "mediatranscoding_aidl_interface-ndk_platform", "mediatranscoding_aidl_interface-ndk_platform", "resourcemanager_aidl_interface-ndk_platform", "resourcemanager_aidl_interface-ndk_platform", "resourceobserver_aidl_interface-ndk_platform", ], ], cflags: [ cflags: [ Loading media/libmediatranscoding/TranscoderWrapper.cpp +123 −90 Original line number Original line Diff line number Diff line Loading @@ -89,27 +89,41 @@ static AMediaFormat* getVideoFormat( } } //static //static const char* TranscoderWrapper::toString(Event::Type type) { std::string TranscoderWrapper::toString(const Event& event) { switch (type) { std::string typeStr; switch (event.type) { case Event::Start: case Event::Start: return "Start"; typeStr = "Start"; break; case Event::Pause: case Event::Pause: return "Pause"; typeStr = "Pause"; break; case Event::Resume: case Event::Resume: return "Resume"; typeStr = "Resume"; break; case Event::Stop: case Event::Stop: return "Stop"; typeStr = "Stop"; break; case Event::Finish: case Event::Finish: return "Finish"; typeStr = "Finish"; break; case Event::Error: case Event::Error: return "Error"; typeStr = "Error"; break; case Event::Progress: case Event::Progress: return "Progress"; typeStr = "Progress"; default: break; break; } default: return "(unknown)"; return "(unknown)"; } } std::string result; result = "job {" + std::to_string(event.clientId) + "," + std::to_string(event.jobId) + "}: " + typeStr; if (event.type == Event::Error || event.type == Event::Progress) { result += " " + std::to_string(event.arg); } return result; } class TranscoderWrapper::CallbackImpl : public MediaTranscoder::CallbackInterface { class TranscoderWrapper::CallbackImpl : public MediaTranscoder::CallbackInterface { public: public: Loading @@ -128,7 +142,7 @@ public: media_status_t error) override { media_status_t error) override { auto owner = mOwner.lock(); auto owner = mOwner.lock(); if (owner != nullptr) { if (owner != nullptr) { owner->onError(mClientId, mJobId, toTranscodingError(error)); owner->onError(mClientId, mJobId, error); } } } } Loading Loading @@ -160,20 +174,41 @@ void TranscoderWrapper::setCallback(const std::shared_ptr<TranscoderCallbackInte mCallback = cb; mCallback = cb; } } static bool isResourceError(media_status_t err) { return err == AMEDIACODEC_ERROR_RECLAIMED || err == AMEDIACODEC_ERROR_INSUFFICIENT_RESOURCE; } void TranscoderWrapper::reportError(ClientIdType clientId, JobIdType jobId, media_status_t err) { auto callback = mCallback.lock(); if (callback != nullptr) { if (isResourceError(err)) { // Add a placeholder pause state to mPausedStateMap. This is required when resuming. // TODO: remove this when transcoder pause/resume logic is ready. New logic will // no longer use the pause states. auto it = mPausedStateMap.find(JobKeyType(clientId, jobId)); if (it == mPausedStateMap.end()) { mPausedStateMap.emplace(JobKeyType(clientId, jobId), std::shared_ptr<const Parcel>()); } callback->onResourceLost(); } else { callback->onError(clientId, jobId, toTranscodingError(err)); } } } void TranscoderWrapper::start(ClientIdType clientId, JobIdType jobId, void TranscoderWrapper::start(ClientIdType clientId, JobIdType jobId, const TranscodingRequestParcel& request, const TranscodingRequestParcel& request, const std::shared_ptr<ITranscodingClientCallback>& clientCb) { const std::shared_ptr<ITranscodingClientCallback>& clientCb) { queueEvent(Event::Start, clientId, jobId, [=] { queueEvent(Event::Start, clientId, jobId, [=] { TranscodingErrorCode err = handleStart(clientId, jobId, request, clientCb); media_status_t err = handleStart(clientId, jobId, request, clientCb); auto callback = mCallback.lock(); if (err != AMEDIA_OK) { if (err != TranscodingErrorCode::kNoError) { cleanup(); cleanup(); reportError(clientId, jobId, err); if (callback != nullptr) { callback->onError(clientId, jobId, err); } } else { } else { auto callback = mCallback.lock(); if (callback != nullptr) { if (callback != nullptr) { callback->onStarted(clientId, jobId); callback->onStarted(clientId, jobId); } } Loading @@ -183,15 +218,15 @@ void TranscoderWrapper::start(ClientIdType clientId, JobIdType jobId, void TranscoderWrapper::pause(ClientIdType clientId, JobIdType jobId) { void TranscoderWrapper::pause(ClientIdType clientId, JobIdType jobId) { queueEvent(Event::Pause, clientId, jobId, [=] { queueEvent(Event::Pause, clientId, jobId, [=] { TranscodingErrorCode err = handlePause(clientId, jobId); media_status_t err = handlePause(clientId, jobId); cleanup(); cleanup(); if (err != AMEDIA_OK) { reportError(clientId, jobId, err); } else { auto callback = mCallback.lock(); auto callback = mCallback.lock(); if (callback != nullptr) { if (callback != nullptr) { if (err != TranscodingErrorCode::kNoError) { callback->onError(clientId, jobId, err); } else { callback->onPaused(clientId, jobId); callback->onPaused(clientId, jobId); } } } } Loading @@ -202,16 +237,13 @@ void TranscoderWrapper::resume(ClientIdType clientId, JobIdType jobId, const TranscodingRequestParcel& request, const TranscodingRequestParcel& request, const std::shared_ptr<ITranscodingClientCallback>& clientCb) { const std::shared_ptr<ITranscodingClientCallback>& clientCb) { queueEvent(Event::Resume, clientId, jobId, [=] { queueEvent(Event::Resume, clientId, jobId, [=] { TranscodingErrorCode err = handleResume(clientId, jobId, request, clientCb); media_status_t err = handleResume(clientId, jobId, request, clientCb); auto callback = mCallback.lock(); if (err != AMEDIA_OK) { if (err != TranscodingErrorCode::kNoError) { cleanup(); cleanup(); reportError(clientId, jobId, err); if (callback != nullptr) { callback->onError(clientId, jobId, err); } } else { } else { auto callback = mCallback.lock(); if (callback != nullptr) { if (callback != nullptr) { callback->onResumed(clientId, jobId); callback->onResumed(clientId, jobId); } } Loading @@ -225,7 +257,7 @@ void TranscoderWrapper::stop(ClientIdType clientId, JobIdType jobId) { // Cancelling the currently running job. // Cancelling the currently running job. media_status_t err = mTranscoder->cancel(); media_status_t err = mTranscoder->cancel(); if (err != AMEDIA_OK) { if (err != AMEDIA_OK) { ALOGE("failed to stop transcoder: %d", err); ALOGW("failed to stop transcoder: %d", err); } else { } else { ALOGI("transcoder stopped"); ALOGI("transcoder stopped"); } } Loading @@ -251,41 +283,43 @@ void TranscoderWrapper::onFinish(ClientIdType clientId, JobIdType jobId) { }); }); } } void TranscoderWrapper::onError(ClientIdType clientId, JobIdType jobId, void TranscoderWrapper::onError(ClientIdType clientId, JobIdType jobId, media_status_t error) { TranscodingErrorCode error) { queueEvent( queueEvent(Event::Error, clientId, jobId, [=] { Event::Error, clientId, jobId, if (mTranscoder != nullptr && clientId == mCurrentClientId && jobId == mCurrentJobId) { [=] { if (mTranscoder != nullptr && clientId == mCurrentClientId && jobId == mCurrentJobId) { cleanup(); cleanup(); } } reportError(clientId, jobId, error); auto callback = mCallback.lock(); }, if (callback != nullptr) { error); callback->onError(clientId, jobId, error); } }); } } void TranscoderWrapper::onProgress(ClientIdType clientId, JobIdType jobId, int32_t progress) { void TranscoderWrapper::onProgress(ClientIdType clientId, JobIdType jobId, int32_t progress) { queueEvent(Event::Progress, clientId, jobId, [=] { queueEvent( Event::Progress, clientId, jobId, [=] { auto callback = mCallback.lock(); auto callback = mCallback.lock(); if (callback != nullptr) { if (callback != nullptr) { callback->onProgressUpdate(clientId, jobId, progress); callback->onProgressUpdate(clientId, jobId, progress); } } }); }, progress); } } TranscodingErrorCode TranscoderWrapper::setupTranscoder( media_status_t TranscoderWrapper::setupTranscoder( ClientIdType clientId, JobIdType jobId, const TranscodingRequestParcel& request, ClientIdType clientId, JobIdType jobId, const TranscodingRequestParcel& request, const std::shared_ptr<ITranscodingClientCallback>& clientCb, const std::shared_ptr<ITranscodingClientCallback>& clientCb, const std::shared_ptr<const Parcel>& pausedState) { const std::shared_ptr<const Parcel>& pausedState) { if (clientCb == nullptr) { if (clientCb == nullptr) { ALOGE("client callback is null"); ALOGE("client callback is null"); return TranscodingErrorCode::kInvalidParameter; return AMEDIA_ERROR_INVALID_PARAMETER; } } if (mTranscoder != nullptr) { if (mTranscoder != nullptr) { ALOGE("transcoder already running"); ALOGE("transcoder already running"); return TranscodingErrorCode::kInvalidOperation; return AMEDIA_ERROR_INVALID_OPERATION; } } Status status; Status status; Loading @@ -293,7 +327,7 @@ TranscodingErrorCode TranscoderWrapper::setupTranscoder( status = clientCb->openFileDescriptor(request.sourceFilePath, "r", &srcFd); status = clientCb->openFileDescriptor(request.sourceFilePath, "r", &srcFd); if (!status.isOk() || srcFd.get() < 0) { if (!status.isOk() || srcFd.get() < 0) { ALOGE("failed to open source"); ALOGE("failed to open source"); return TranscodingErrorCode::kErrorIO; return AMEDIA_ERROR_IO; } } // Open dest file with "rw", as the transcoder could potentially reuse part of it // Open dest file with "rw", as the transcoder could potentially reuse part of it Loading @@ -302,7 +336,7 @@ TranscodingErrorCode TranscoderWrapper::setupTranscoder( status = clientCb->openFileDescriptor(request.destinationFilePath, "rw", &dstFd); status = clientCb->openFileDescriptor(request.destinationFilePath, "rw", &dstFd); if (!status.isOk() || dstFd.get() < 0) { if (!status.isOk() || dstFd.get() < 0) { ALOGE("failed to open destination"); ALOGE("failed to open destination"); return TranscodingErrorCode::kErrorIO; return AMEDIA_ERROR_IO; } } mCurrentClientId = clientId; mCurrentClientId = clientId; Loading @@ -311,19 +345,19 @@ TranscodingErrorCode TranscoderWrapper::setupTranscoder( mTranscoder = MediaTranscoder::create(mTranscoderCb, pausedState); mTranscoder = MediaTranscoder::create(mTranscoderCb, pausedState); if (mTranscoder == nullptr) { if (mTranscoder == nullptr) { ALOGE("failed to create transcoder"); ALOGE("failed to create transcoder"); return TranscodingErrorCode::kUnknown; return AMEDIA_ERROR_UNKNOWN; } } media_status_t err = mTranscoder->configureSource(srcFd.get()); media_status_t err = mTranscoder->configureSource(srcFd.get()); if (err != AMEDIA_OK) { if (err != AMEDIA_OK) { ALOGE("failed to configure source: %d", err); ALOGE("failed to configure source: %d", err); return toTranscodingError(err); return err; } } std::vector<std::shared_ptr<AMediaFormat>> trackFormats = mTranscoder->getTrackFormats(); std::vector<std::shared_ptr<AMediaFormat>> trackFormats = mTranscoder->getTrackFormats(); if (trackFormats.size() == 0) { if (trackFormats.size() == 0) { ALOGE("failed to get track formats!"); ALOGE("failed to get track formats!"); return TranscodingErrorCode::kMalformed; return AMEDIA_ERROR_MALFORMED; } } for (int i = 0; i < trackFormats.size(); ++i) { for (int i = 0; i < trackFormats.size(); ++i) { Loading @@ -341,43 +375,43 @@ TranscodingErrorCode TranscoderWrapper::setupTranscoder( } } if (err != AMEDIA_OK) { if (err != AMEDIA_OK) { ALOGE("failed to configure track format for track %d: %d", i, err); ALOGE("failed to configure track format for track %d: %d", i, err); return toTranscodingError(err); return err; } } } } err = mTranscoder->configureDestination(dstFd.get()); err = mTranscoder->configureDestination(dstFd.get()); if (err != AMEDIA_OK) { if (err != AMEDIA_OK) { ALOGE("failed to configure dest: %d", err); ALOGE("failed to configure dest: %d", err); return toTranscodingError(err); return err; } } return TranscodingErrorCode::kNoError; return AMEDIA_OK; } } TranscodingErrorCode TranscoderWrapper::handleStart( media_status_t TranscoderWrapper::handleStart( ClientIdType clientId, JobIdType jobId, const TranscodingRequestParcel& request, ClientIdType clientId, JobIdType jobId, const TranscodingRequestParcel& request, const std::shared_ptr<ITranscodingClientCallback>& clientCb) { const std::shared_ptr<ITranscodingClientCallback>& clientCb) { ALOGI("setting up transcoder for start"); ALOGI("%s: setting up transcoder for start", __FUNCTION__); TranscodingErrorCode err = setupTranscoder(clientId, jobId, request, clientCb); media_status_t err = setupTranscoder(clientId, jobId, request, clientCb); if (err != TranscodingErrorCode::kNoError) { if (err != AMEDIA_OK) { ALOGI("%s: failed to setup transcoder", __FUNCTION__); ALOGI("%s: failed to setup transcoder", __FUNCTION__); return err; return err; } } media_status_t status = mTranscoder->start(); err = mTranscoder->start(); if (status != AMEDIA_OK) { if (err != AMEDIA_OK) { ALOGE("%s: failed to start transcoder: %d", __FUNCTION__, err); ALOGE("%s: failed to start transcoder: %d", __FUNCTION__, err); return toTranscodingError(status); return err; } } ALOGI("%s: transcoder started", __FUNCTION__); ALOGI("%s: transcoder started", __FUNCTION__); return TranscodingErrorCode::kNoError; return AMEDIA_OK; } } TranscodingErrorCode TranscoderWrapper::handlePause(ClientIdType clientId, JobIdType jobId) { media_status_t TranscoderWrapper::handlePause(ClientIdType clientId, JobIdType jobId) { if (mTranscoder == nullptr) { if (mTranscoder == nullptr) { ALOGE("%s: transcoder is not running", __FUNCTION__); ALOGE("%s: transcoder is not running", __FUNCTION__); return TranscodingErrorCode::kInvalidOperation; return AMEDIA_ERROR_INVALID_OPERATION; } } if (clientId != mCurrentClientId || jobId != mCurrentJobId) { if (clientId != mCurrentClientId || jobId != mCurrentJobId) { Loading @@ -385,19 +419,21 @@ TranscodingErrorCode TranscoderWrapper::handlePause(ClientIdType clientId, JobId (long long)clientId, jobId, (long long)mCurrentClientId, mCurrentJobId); (long long)clientId, jobId, (long long)mCurrentClientId, mCurrentJobId); } } ALOGI("%s: pausing transcoder", __FUNCTION__); std::shared_ptr<const Parcel> pauseStates; std::shared_ptr<const Parcel> pauseStates; media_status_t err = mTranscoder->pause(&pauseStates); media_status_t err = mTranscoder->pause(&pauseStates); if (err != AMEDIA_OK) { if (err != AMEDIA_OK) { ALOGE("%s: failed to pause transcoder: %d", __FUNCTION__, err); ALOGE("%s: failed to pause transcoder: %d", __FUNCTION__, err); return toTranscodingError(err); return err; } } mPausedStateMap[JobKeyType(clientId, jobId)] = pauseStates; mPausedStateMap[JobKeyType(clientId, jobId)] = pauseStates; ALOGI("%s: transcoder paused", __FUNCTION__); ALOGI("%s: transcoder paused", __FUNCTION__); return TranscodingErrorCode::kNoError; return AMEDIA_OK; } } TranscodingErrorCode TranscoderWrapper::handleResume( media_status_t TranscoderWrapper::handleResume( ClientIdType clientId, JobIdType jobId, const TranscodingRequestParcel& request, ClientIdType clientId, JobIdType jobId, const TranscodingRequestParcel& request, const std::shared_ptr<ITranscodingClientCallback>& clientCb) { const std::shared_ptr<ITranscodingClientCallback>& clientCb) { std::shared_ptr<const Parcel> pausedState; std::shared_ptr<const Parcel> pausedState; Loading @@ -407,24 +443,24 @@ TranscodingErrorCode TranscoderWrapper::handleResume( mPausedStateMap.erase(it); mPausedStateMap.erase(it); } else { } else { ALOGE("%s: can't find paused state", __FUNCTION__); ALOGE("%s: can't find paused state", __FUNCTION__); return TranscodingErrorCode::kInvalidOperation; return AMEDIA_ERROR_INVALID_OPERATION; } } ALOGI("setting up transcoder for resume"); ALOGI("%s: setting up transcoder for resume", __FUNCTION__); TranscodingErrorCode err = setupTranscoder(clientId, jobId, request, clientCb, pausedState); media_status_t err = setupTranscoder(clientId, jobId, request, clientCb, pausedState); if (err != TranscodingErrorCode::kNoError) { if (err != AMEDIA_OK) { ALOGE("%s: failed to setup transcoder", __FUNCTION__); ALOGE("%s: failed to setup transcoder: %d", __FUNCTION__, err); return err; return err; } } media_status_t status = mTranscoder->resume(); err = mTranscoder->resume(); if (status != AMEDIA_OK) { if (err != AMEDIA_OK) { ALOGE("%s: failed to resume transcoder: %d", __FUNCTION__, err); ALOGE("%s: failed to resume transcoder: %d", __FUNCTION__, err); return toTranscodingError(status); return err; } } ALOGI("%s: transcoder resumed", __FUNCTION__); ALOGI("%s: transcoder resumed", __FUNCTION__); return TranscodingErrorCode::kNoError; return AMEDIA_OK; } } void TranscoderWrapper::cleanup() { void TranscoderWrapper::cleanup() { Loading @@ -435,12 +471,10 @@ void TranscoderWrapper::cleanup() { } } void TranscoderWrapper::queueEvent(Event::Type type, ClientIdType clientId, JobIdType jobId, void TranscoderWrapper::queueEvent(Event::Type type, ClientIdType clientId, JobIdType jobId, const std::function<void()> runnable) { const std::function<void()> runnable, int32_t arg) { ALOGV("%s: job {%lld, %d}: %s", __FUNCTION__, (long long)clientId, jobId, toString(type)); std::scoped_lock lock{mLock}; std::scoped_lock lock{mLock}; mQueue.push_back({type, clientId, jobId, runnable}); mQueue.push_back({type, clientId, jobId, runnable, arg}); mCondition.notify_one(); mCondition.notify_one(); } } Loading @@ -457,8 +491,7 @@ void TranscoderWrapper::threadLoop() { Event event = *mQueue.begin(); Event event = *mQueue.begin(); mQueue.pop_front(); mQueue.pop_front(); ALOGD("%s: job {%lld, %d}: %s", __FUNCTION__, (long long)event.clientId, event.jobId, ALOGD("%s: %s", __FUNCTION__, toString(event).c_str()); toString(event.type)); lock.unlock(); lock.unlock(); event.runnable(); event.runnable(); Loading media/libmediatranscoding/TranscodingJobScheduler.cpp +23 −5 Original line number Original line Diff line number Diff line Loading @@ -38,8 +38,13 @@ String8 TranscodingJobScheduler::jobToString(const JobKeyType& jobKey) { TranscodingJobScheduler::TranscodingJobScheduler( TranscodingJobScheduler::TranscodingJobScheduler( const std::shared_ptr<TranscoderInterface>& transcoder, const std::shared_ptr<TranscoderInterface>& transcoder, const std::shared_ptr<UidPolicyInterface>& uidPolicy) const std::shared_ptr<UidPolicyInterface>& uidPolicy, : mTranscoder(transcoder), mUidPolicy(uidPolicy), mCurrentJob(nullptr), mResourceLost(false) { const std::shared_ptr<ResourcePolicyInterface>& resourcePolicy) : mTranscoder(transcoder), mUidPolicy(uidPolicy), mResourcePolicy(resourcePolicy), mCurrentJob(nullptr), mResourceLost(false) { // Only push empty offline queue initially. Realtime queues are added when requests come in. // Only push empty offline queue initially. Realtime queues are added when requests come in. mUidSortedList.push_back(OFFLINE_UID); mUidSortedList.push_back(OFFLINE_UID); mOfflineUidIterator = mUidSortedList.begin(); mOfflineUidIterator = mUidSortedList.begin(); Loading Loading @@ -398,15 +403,24 @@ void TranscodingJobScheduler::onProgressUpdate(ClientIdType clientId, JobIdType } } void TranscodingJobScheduler::onResourceLost() { void TranscodingJobScheduler::onResourceLost() { ALOGV("%s", __FUNCTION__); ALOGI("%s", __FUNCTION__); std::scoped_lock lock{mLock}; std::scoped_lock lock{mLock}; if (mResourceLost) { return; } // If we receive a resource loss event, the TranscoderLibrary already paused // If we receive a resource loss event, the TranscoderLibrary already paused // the transcoding, so we don't need to call onPaused to notify it to pause. // the transcoding, so we don't need to call onPaused to notify it to pause. // Only need to update the job state here. // Only need to update the job state here. if (mCurrentJob != nullptr && mCurrentJob->state == Job::RUNNING) { if (mCurrentJob != nullptr && mCurrentJob->state == Job::RUNNING) { mCurrentJob->state = Job::PAUSED; mCurrentJob->state = Job::PAUSED; // Notify the client as a paused event. auto clientCallback = mCurrentJob->callback.lock(); if (clientCallback != nullptr) { clientCallback->onTranscodingPaused(mCurrentJob->key.second); } } } mResourceLost = true; mResourceLost = true; Loading Loading @@ -439,10 +453,14 @@ void TranscodingJobScheduler::onTopUidsChanged(const std::unordered_set<uid_t>& } } void TranscodingJobScheduler::onResourceAvailable() { void TranscodingJobScheduler::onResourceAvailable() { ALOGV("%s", __FUNCTION__); std::scoped_lock lock{mLock}; std::scoped_lock lock{mLock}; if (!mResourceLost) { return; } ALOGI("%s", __FUNCTION__); mResourceLost = false; mResourceLost = false; updateCurrentJob_l(); updateCurrentJob_l(); Loading media/libmediatranscoding/TranscodingResourcePolicy.cpp 0 → 100644 +169 −0 Original line number Original line Diff line number Diff line /* * Copyright (C) 2020 The Android Open Source Project * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ //#define LOG_NDEBUG 0 #define LOG_TAG "TranscodingResourcePolicy" #include <aidl/android/media/BnResourceObserver.h> #include <aidl/android/media/IResourceObserverService.h> #include <android/binder_manager.h> #include <android/binder_process.h> #include <binder/IServiceManager.h> #include <media/TranscodingResourcePolicy.h> #include <utils/Log.h> namespace android { using Status = ::ndk::ScopedAStatus; using ::aidl::android::media::BnResourceObserver; using ::aidl::android::media::IResourceObserverService; using ::aidl::android::media::MediaObservableEvent; using ::aidl::android::media::MediaObservableFilter; using ::aidl::android::media::MediaObservableParcel; using ::aidl::android::media::MediaObservableType; static std::string toString(const MediaObservableParcel& observable) { return "{" + ::aidl::android::media::toString(observable.type) + ", " + std::to_string(observable.value) + "}"; } struct TranscodingResourcePolicy::ResourceObserver : public BnResourceObserver { explicit ResourceObserver(TranscodingResourcePolicy* owner) : mOwner(owner), mPid(getpid()) {} // IResourceObserver ::ndk::ScopedAStatus onStatusChanged( MediaObservableEvent event, int32_t uid, int32_t pid, const std::vector<MediaObservableParcel>& observables) override { ALOGD("%s: %s, uid %d, pid %d, %s", __FUNCTION__, ::aidl::android::media::toString(event).c_str(), uid, pid, toString(observables[0]).c_str()); // Only report kIdle event for codec resources from other processes. if (((uint64_t)event & (uint64_t)MediaObservableEvent::kIdle) != 0 && (pid != mPid)) { for (auto& observable : observables) { if (observable.type == MediaObservableType::kVideoSecureCodec || observable.type == MediaObservableType::kVideoNonSecureCodec) { mOwner->onResourceAvailable(); break; } } } return ::ndk::ScopedAStatus::ok(); } TranscodingResourcePolicy* mOwner; const pid_t mPid; }; // static void TranscodingResourcePolicy::BinderDiedCallback(void* cookie) { TranscodingResourcePolicy* owner = reinterpret_cast<TranscodingResourcePolicy*>(cookie); if (owner != nullptr) { owner->unregisterSelf(); } // TODO(chz): retry to connecting to IResourceObserverService after failure. // Also need to have back-up logic if IResourceObserverService is offline for // Prolonged period of time. A possible alternative could be, during period where // IResourceObserverService is not available, trigger onResourceAvailable() everytime // when top uid changes (in hope that'll free up some codec instances that we could // reclaim). } TranscodingResourcePolicy::TranscodingResourcePolicy() : mRegistered(false), mDeathRecipient(AIBinder_DeathRecipient_new(BinderDiedCallback)) { registerSelf(); } TranscodingResourcePolicy::~TranscodingResourcePolicy() { unregisterSelf(); } void TranscodingResourcePolicy::registerSelf() { ALOGI("TranscodingResourcePolicy: registerSelf"); ::ndk::SpAIBinder binder(AServiceManager_getService("media.resource_observer")); std::scoped_lock lock{mRegisteredLock}; if (mRegistered) { return; } // TODO(chz): retry to connecting to IResourceObserverService after failure. mService = IResourceObserverService::fromBinder(binder); if (mService == nullptr) { ALOGE("Failed to get IResourceObserverService"); return; } // Only register filters for codec resource available. mObserver = ::ndk::SharedRefBase::make<ResourceObserver>(this); std::vector<MediaObservableFilter> filters = { {MediaObservableType::kVideoSecureCodec, MediaObservableEvent::kIdle}, {MediaObservableType::kVideoNonSecureCodec, MediaObservableEvent::kIdle}}; Status status = mService->registerObserver(mObserver, filters); if (!status.isOk()) { ALOGE("failed to register: error %d", status.getServiceSpecificError()); mService = nullptr; mObserver = nullptr; return; } AIBinder_linkToDeath(binder.get(), mDeathRecipient.get(), reinterpret_cast<void*>(this)); ALOGD("@@@ registered observer"); mRegistered = true; } void TranscodingResourcePolicy::unregisterSelf() { ALOGI("TranscodingResourcePolicy: unregisterSelf"); std::scoped_lock lock{mRegisteredLock}; if (!mRegistered) { return; } ::ndk::SpAIBinder binder = mService->asBinder(); if (binder.get() != nullptr) { Status status = mService->unregisterObserver(mObserver); AIBinder_unlinkToDeath(binder.get(), mDeathRecipient.get(), reinterpret_cast<void*>(this)); } mService = nullptr; mObserver = nullptr; mRegistered = false; } void TranscodingResourcePolicy::setCallback( const std::shared_ptr<ResourcePolicyCallbackInterface>& cb) { std::scoped_lock lock{mCallbackLock}; mResourcePolicyCallback = cb; } void TranscodingResourcePolicy::onResourceAvailable() { std::shared_ptr<ResourcePolicyCallbackInterface> cb; { std::scoped_lock lock{mCallbackLock}; cb = mResourcePolicyCallback.lock(); } if (cb != nullptr) { cb->onResourceAvailable(); } } } // namespace android media/libmediatranscoding/include/media/ResourcePolicyInterface.h 0 → 100644 +48 −0 Original line number Original line Diff line number Diff line /* * Copyright (C) 2020 The Android Open Source Project * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ #ifndef ANDROID_MEDIA_RESOURCE_POLICY_INTERFACE_H #define ANDROID_MEDIA_RESOURCE_POLICY_INTERFACE_H #include <memory> namespace android { class ResourcePolicyCallbackInterface; // Interface for the JobScheduler to control the resource status updates. class ResourcePolicyInterface { public: // Set the associated callback interface to send the events when resource // status changes. (Set to nullptr will stop the updates.) virtual void setCallback(const std::shared_ptr<ResourcePolicyCallbackInterface>& cb) = 0; protected: virtual ~ResourcePolicyInterface() = default; }; // Interface for notifying the JobScheduler of a change in resource status. class ResourcePolicyCallbackInterface { public: // Called when codec resources become available. The scheduler may use this // as a signal to attempt restart transcoding jobs that were previously // paused due to temporary resource loss. virtual void onResourceAvailable() = 0; protected: virtual ~ResourcePolicyCallbackInterface() = default; }; } // namespace android #endif // ANDROID_MEDIA_RESOURCE_POLICY_INTERFACE_H Loading
media/libmediatranscoding/Android.bp +5 −0 Original line number Original line Diff line number Diff line Loading @@ -54,6 +54,7 @@ cc_library_shared { srcs: [ srcs: [ "TranscodingClientManager.cpp", "TranscodingClientManager.cpp", "TranscodingJobScheduler.cpp", "TranscodingJobScheduler.cpp", "TranscodingResourcePolicy.cpp", "TranscodingUidPolicy.cpp", "TranscodingUidPolicy.cpp", "TranscoderWrapper.cpp", "TranscoderWrapper.cpp", ], ], Loading @@ -67,12 +68,16 @@ cc_library_shared { "libbinder", "libbinder", "libmediandk", "libmediandk", ], ], export_shared_lib_headers: [ "libmediandk", ], export_include_dirs: ["include"], export_include_dirs: ["include"], static_libs: [ static_libs: [ "mediatranscoding_aidl_interface-ndk_platform", "mediatranscoding_aidl_interface-ndk_platform", "resourcemanager_aidl_interface-ndk_platform", "resourcemanager_aidl_interface-ndk_platform", "resourceobserver_aidl_interface-ndk_platform", ], ], cflags: [ cflags: [ Loading
media/libmediatranscoding/TranscoderWrapper.cpp +123 −90 Original line number Original line Diff line number Diff line Loading @@ -89,27 +89,41 @@ static AMediaFormat* getVideoFormat( } } //static //static const char* TranscoderWrapper::toString(Event::Type type) { std::string TranscoderWrapper::toString(const Event& event) { switch (type) { std::string typeStr; switch (event.type) { case Event::Start: case Event::Start: return "Start"; typeStr = "Start"; break; case Event::Pause: case Event::Pause: return "Pause"; typeStr = "Pause"; break; case Event::Resume: case Event::Resume: return "Resume"; typeStr = "Resume"; break; case Event::Stop: case Event::Stop: return "Stop"; typeStr = "Stop"; break; case Event::Finish: case Event::Finish: return "Finish"; typeStr = "Finish"; break; case Event::Error: case Event::Error: return "Error"; typeStr = "Error"; break; case Event::Progress: case Event::Progress: return "Progress"; typeStr = "Progress"; default: break; break; } default: return "(unknown)"; return "(unknown)"; } } std::string result; result = "job {" + std::to_string(event.clientId) + "," + std::to_string(event.jobId) + "}: " + typeStr; if (event.type == Event::Error || event.type == Event::Progress) { result += " " + std::to_string(event.arg); } return result; } class TranscoderWrapper::CallbackImpl : public MediaTranscoder::CallbackInterface { class TranscoderWrapper::CallbackImpl : public MediaTranscoder::CallbackInterface { public: public: Loading @@ -128,7 +142,7 @@ public: media_status_t error) override { media_status_t error) override { auto owner = mOwner.lock(); auto owner = mOwner.lock(); if (owner != nullptr) { if (owner != nullptr) { owner->onError(mClientId, mJobId, toTranscodingError(error)); owner->onError(mClientId, mJobId, error); } } } } Loading Loading @@ -160,20 +174,41 @@ void TranscoderWrapper::setCallback(const std::shared_ptr<TranscoderCallbackInte mCallback = cb; mCallback = cb; } } static bool isResourceError(media_status_t err) { return err == AMEDIACODEC_ERROR_RECLAIMED || err == AMEDIACODEC_ERROR_INSUFFICIENT_RESOURCE; } void TranscoderWrapper::reportError(ClientIdType clientId, JobIdType jobId, media_status_t err) { auto callback = mCallback.lock(); if (callback != nullptr) { if (isResourceError(err)) { // Add a placeholder pause state to mPausedStateMap. This is required when resuming. // TODO: remove this when transcoder pause/resume logic is ready. New logic will // no longer use the pause states. auto it = mPausedStateMap.find(JobKeyType(clientId, jobId)); if (it == mPausedStateMap.end()) { mPausedStateMap.emplace(JobKeyType(clientId, jobId), std::shared_ptr<const Parcel>()); } callback->onResourceLost(); } else { callback->onError(clientId, jobId, toTranscodingError(err)); } } } void TranscoderWrapper::start(ClientIdType clientId, JobIdType jobId, void TranscoderWrapper::start(ClientIdType clientId, JobIdType jobId, const TranscodingRequestParcel& request, const TranscodingRequestParcel& request, const std::shared_ptr<ITranscodingClientCallback>& clientCb) { const std::shared_ptr<ITranscodingClientCallback>& clientCb) { queueEvent(Event::Start, clientId, jobId, [=] { queueEvent(Event::Start, clientId, jobId, [=] { TranscodingErrorCode err = handleStart(clientId, jobId, request, clientCb); media_status_t err = handleStart(clientId, jobId, request, clientCb); auto callback = mCallback.lock(); if (err != AMEDIA_OK) { if (err != TranscodingErrorCode::kNoError) { cleanup(); cleanup(); reportError(clientId, jobId, err); if (callback != nullptr) { callback->onError(clientId, jobId, err); } } else { } else { auto callback = mCallback.lock(); if (callback != nullptr) { if (callback != nullptr) { callback->onStarted(clientId, jobId); callback->onStarted(clientId, jobId); } } Loading @@ -183,15 +218,15 @@ void TranscoderWrapper::start(ClientIdType clientId, JobIdType jobId, void TranscoderWrapper::pause(ClientIdType clientId, JobIdType jobId) { void TranscoderWrapper::pause(ClientIdType clientId, JobIdType jobId) { queueEvent(Event::Pause, clientId, jobId, [=] { queueEvent(Event::Pause, clientId, jobId, [=] { TranscodingErrorCode err = handlePause(clientId, jobId); media_status_t err = handlePause(clientId, jobId); cleanup(); cleanup(); if (err != AMEDIA_OK) { reportError(clientId, jobId, err); } else { auto callback = mCallback.lock(); auto callback = mCallback.lock(); if (callback != nullptr) { if (callback != nullptr) { if (err != TranscodingErrorCode::kNoError) { callback->onError(clientId, jobId, err); } else { callback->onPaused(clientId, jobId); callback->onPaused(clientId, jobId); } } } } Loading @@ -202,16 +237,13 @@ void TranscoderWrapper::resume(ClientIdType clientId, JobIdType jobId, const TranscodingRequestParcel& request, const TranscodingRequestParcel& request, const std::shared_ptr<ITranscodingClientCallback>& clientCb) { const std::shared_ptr<ITranscodingClientCallback>& clientCb) { queueEvent(Event::Resume, clientId, jobId, [=] { queueEvent(Event::Resume, clientId, jobId, [=] { TranscodingErrorCode err = handleResume(clientId, jobId, request, clientCb); media_status_t err = handleResume(clientId, jobId, request, clientCb); auto callback = mCallback.lock(); if (err != AMEDIA_OK) { if (err != TranscodingErrorCode::kNoError) { cleanup(); cleanup(); reportError(clientId, jobId, err); if (callback != nullptr) { callback->onError(clientId, jobId, err); } } else { } else { auto callback = mCallback.lock(); if (callback != nullptr) { if (callback != nullptr) { callback->onResumed(clientId, jobId); callback->onResumed(clientId, jobId); } } Loading @@ -225,7 +257,7 @@ void TranscoderWrapper::stop(ClientIdType clientId, JobIdType jobId) { // Cancelling the currently running job. // Cancelling the currently running job. media_status_t err = mTranscoder->cancel(); media_status_t err = mTranscoder->cancel(); if (err != AMEDIA_OK) { if (err != AMEDIA_OK) { ALOGE("failed to stop transcoder: %d", err); ALOGW("failed to stop transcoder: %d", err); } else { } else { ALOGI("transcoder stopped"); ALOGI("transcoder stopped"); } } Loading @@ -251,41 +283,43 @@ void TranscoderWrapper::onFinish(ClientIdType clientId, JobIdType jobId) { }); }); } } void TranscoderWrapper::onError(ClientIdType clientId, JobIdType jobId, void TranscoderWrapper::onError(ClientIdType clientId, JobIdType jobId, media_status_t error) { TranscodingErrorCode error) { queueEvent( queueEvent(Event::Error, clientId, jobId, [=] { Event::Error, clientId, jobId, if (mTranscoder != nullptr && clientId == mCurrentClientId && jobId == mCurrentJobId) { [=] { if (mTranscoder != nullptr && clientId == mCurrentClientId && jobId == mCurrentJobId) { cleanup(); cleanup(); } } reportError(clientId, jobId, error); auto callback = mCallback.lock(); }, if (callback != nullptr) { error); callback->onError(clientId, jobId, error); } }); } } void TranscoderWrapper::onProgress(ClientIdType clientId, JobIdType jobId, int32_t progress) { void TranscoderWrapper::onProgress(ClientIdType clientId, JobIdType jobId, int32_t progress) { queueEvent(Event::Progress, clientId, jobId, [=] { queueEvent( Event::Progress, clientId, jobId, [=] { auto callback = mCallback.lock(); auto callback = mCallback.lock(); if (callback != nullptr) { if (callback != nullptr) { callback->onProgressUpdate(clientId, jobId, progress); callback->onProgressUpdate(clientId, jobId, progress); } } }); }, progress); } } TranscodingErrorCode TranscoderWrapper::setupTranscoder( media_status_t TranscoderWrapper::setupTranscoder( ClientIdType clientId, JobIdType jobId, const TranscodingRequestParcel& request, ClientIdType clientId, JobIdType jobId, const TranscodingRequestParcel& request, const std::shared_ptr<ITranscodingClientCallback>& clientCb, const std::shared_ptr<ITranscodingClientCallback>& clientCb, const std::shared_ptr<const Parcel>& pausedState) { const std::shared_ptr<const Parcel>& pausedState) { if (clientCb == nullptr) { if (clientCb == nullptr) { ALOGE("client callback is null"); ALOGE("client callback is null"); return TranscodingErrorCode::kInvalidParameter; return AMEDIA_ERROR_INVALID_PARAMETER; } } if (mTranscoder != nullptr) { if (mTranscoder != nullptr) { ALOGE("transcoder already running"); ALOGE("transcoder already running"); return TranscodingErrorCode::kInvalidOperation; return AMEDIA_ERROR_INVALID_OPERATION; } } Status status; Status status; Loading @@ -293,7 +327,7 @@ TranscodingErrorCode TranscoderWrapper::setupTranscoder( status = clientCb->openFileDescriptor(request.sourceFilePath, "r", &srcFd); status = clientCb->openFileDescriptor(request.sourceFilePath, "r", &srcFd); if (!status.isOk() || srcFd.get() < 0) { if (!status.isOk() || srcFd.get() < 0) { ALOGE("failed to open source"); ALOGE("failed to open source"); return TranscodingErrorCode::kErrorIO; return AMEDIA_ERROR_IO; } } // Open dest file with "rw", as the transcoder could potentially reuse part of it // Open dest file with "rw", as the transcoder could potentially reuse part of it Loading @@ -302,7 +336,7 @@ TranscodingErrorCode TranscoderWrapper::setupTranscoder( status = clientCb->openFileDescriptor(request.destinationFilePath, "rw", &dstFd); status = clientCb->openFileDescriptor(request.destinationFilePath, "rw", &dstFd); if (!status.isOk() || dstFd.get() < 0) { if (!status.isOk() || dstFd.get() < 0) { ALOGE("failed to open destination"); ALOGE("failed to open destination"); return TranscodingErrorCode::kErrorIO; return AMEDIA_ERROR_IO; } } mCurrentClientId = clientId; mCurrentClientId = clientId; Loading @@ -311,19 +345,19 @@ TranscodingErrorCode TranscoderWrapper::setupTranscoder( mTranscoder = MediaTranscoder::create(mTranscoderCb, pausedState); mTranscoder = MediaTranscoder::create(mTranscoderCb, pausedState); if (mTranscoder == nullptr) { if (mTranscoder == nullptr) { ALOGE("failed to create transcoder"); ALOGE("failed to create transcoder"); return TranscodingErrorCode::kUnknown; return AMEDIA_ERROR_UNKNOWN; } } media_status_t err = mTranscoder->configureSource(srcFd.get()); media_status_t err = mTranscoder->configureSource(srcFd.get()); if (err != AMEDIA_OK) { if (err != AMEDIA_OK) { ALOGE("failed to configure source: %d", err); ALOGE("failed to configure source: %d", err); return toTranscodingError(err); return err; } } std::vector<std::shared_ptr<AMediaFormat>> trackFormats = mTranscoder->getTrackFormats(); std::vector<std::shared_ptr<AMediaFormat>> trackFormats = mTranscoder->getTrackFormats(); if (trackFormats.size() == 0) { if (trackFormats.size() == 0) { ALOGE("failed to get track formats!"); ALOGE("failed to get track formats!"); return TranscodingErrorCode::kMalformed; return AMEDIA_ERROR_MALFORMED; } } for (int i = 0; i < trackFormats.size(); ++i) { for (int i = 0; i < trackFormats.size(); ++i) { Loading @@ -341,43 +375,43 @@ TranscodingErrorCode TranscoderWrapper::setupTranscoder( } } if (err != AMEDIA_OK) { if (err != AMEDIA_OK) { ALOGE("failed to configure track format for track %d: %d", i, err); ALOGE("failed to configure track format for track %d: %d", i, err); return toTranscodingError(err); return err; } } } } err = mTranscoder->configureDestination(dstFd.get()); err = mTranscoder->configureDestination(dstFd.get()); if (err != AMEDIA_OK) { if (err != AMEDIA_OK) { ALOGE("failed to configure dest: %d", err); ALOGE("failed to configure dest: %d", err); return toTranscodingError(err); return err; } } return TranscodingErrorCode::kNoError; return AMEDIA_OK; } } TranscodingErrorCode TranscoderWrapper::handleStart( media_status_t TranscoderWrapper::handleStart( ClientIdType clientId, JobIdType jobId, const TranscodingRequestParcel& request, ClientIdType clientId, JobIdType jobId, const TranscodingRequestParcel& request, const std::shared_ptr<ITranscodingClientCallback>& clientCb) { const std::shared_ptr<ITranscodingClientCallback>& clientCb) { ALOGI("setting up transcoder for start"); ALOGI("%s: setting up transcoder for start", __FUNCTION__); TranscodingErrorCode err = setupTranscoder(clientId, jobId, request, clientCb); media_status_t err = setupTranscoder(clientId, jobId, request, clientCb); if (err != TranscodingErrorCode::kNoError) { if (err != AMEDIA_OK) { ALOGI("%s: failed to setup transcoder", __FUNCTION__); ALOGI("%s: failed to setup transcoder", __FUNCTION__); return err; return err; } } media_status_t status = mTranscoder->start(); err = mTranscoder->start(); if (status != AMEDIA_OK) { if (err != AMEDIA_OK) { ALOGE("%s: failed to start transcoder: %d", __FUNCTION__, err); ALOGE("%s: failed to start transcoder: %d", __FUNCTION__, err); return toTranscodingError(status); return err; } } ALOGI("%s: transcoder started", __FUNCTION__); ALOGI("%s: transcoder started", __FUNCTION__); return TranscodingErrorCode::kNoError; return AMEDIA_OK; } } TranscodingErrorCode TranscoderWrapper::handlePause(ClientIdType clientId, JobIdType jobId) { media_status_t TranscoderWrapper::handlePause(ClientIdType clientId, JobIdType jobId) { if (mTranscoder == nullptr) { if (mTranscoder == nullptr) { ALOGE("%s: transcoder is not running", __FUNCTION__); ALOGE("%s: transcoder is not running", __FUNCTION__); return TranscodingErrorCode::kInvalidOperation; return AMEDIA_ERROR_INVALID_OPERATION; } } if (clientId != mCurrentClientId || jobId != mCurrentJobId) { if (clientId != mCurrentClientId || jobId != mCurrentJobId) { Loading @@ -385,19 +419,21 @@ TranscodingErrorCode TranscoderWrapper::handlePause(ClientIdType clientId, JobId (long long)clientId, jobId, (long long)mCurrentClientId, mCurrentJobId); (long long)clientId, jobId, (long long)mCurrentClientId, mCurrentJobId); } } ALOGI("%s: pausing transcoder", __FUNCTION__); std::shared_ptr<const Parcel> pauseStates; std::shared_ptr<const Parcel> pauseStates; media_status_t err = mTranscoder->pause(&pauseStates); media_status_t err = mTranscoder->pause(&pauseStates); if (err != AMEDIA_OK) { if (err != AMEDIA_OK) { ALOGE("%s: failed to pause transcoder: %d", __FUNCTION__, err); ALOGE("%s: failed to pause transcoder: %d", __FUNCTION__, err); return toTranscodingError(err); return err; } } mPausedStateMap[JobKeyType(clientId, jobId)] = pauseStates; mPausedStateMap[JobKeyType(clientId, jobId)] = pauseStates; ALOGI("%s: transcoder paused", __FUNCTION__); ALOGI("%s: transcoder paused", __FUNCTION__); return TranscodingErrorCode::kNoError; return AMEDIA_OK; } } TranscodingErrorCode TranscoderWrapper::handleResume( media_status_t TranscoderWrapper::handleResume( ClientIdType clientId, JobIdType jobId, const TranscodingRequestParcel& request, ClientIdType clientId, JobIdType jobId, const TranscodingRequestParcel& request, const std::shared_ptr<ITranscodingClientCallback>& clientCb) { const std::shared_ptr<ITranscodingClientCallback>& clientCb) { std::shared_ptr<const Parcel> pausedState; std::shared_ptr<const Parcel> pausedState; Loading @@ -407,24 +443,24 @@ TranscodingErrorCode TranscoderWrapper::handleResume( mPausedStateMap.erase(it); mPausedStateMap.erase(it); } else { } else { ALOGE("%s: can't find paused state", __FUNCTION__); ALOGE("%s: can't find paused state", __FUNCTION__); return TranscodingErrorCode::kInvalidOperation; return AMEDIA_ERROR_INVALID_OPERATION; } } ALOGI("setting up transcoder for resume"); ALOGI("%s: setting up transcoder for resume", __FUNCTION__); TranscodingErrorCode err = setupTranscoder(clientId, jobId, request, clientCb, pausedState); media_status_t err = setupTranscoder(clientId, jobId, request, clientCb, pausedState); if (err != TranscodingErrorCode::kNoError) { if (err != AMEDIA_OK) { ALOGE("%s: failed to setup transcoder", __FUNCTION__); ALOGE("%s: failed to setup transcoder: %d", __FUNCTION__, err); return err; return err; } } media_status_t status = mTranscoder->resume(); err = mTranscoder->resume(); if (status != AMEDIA_OK) { if (err != AMEDIA_OK) { ALOGE("%s: failed to resume transcoder: %d", __FUNCTION__, err); ALOGE("%s: failed to resume transcoder: %d", __FUNCTION__, err); return toTranscodingError(status); return err; } } ALOGI("%s: transcoder resumed", __FUNCTION__); ALOGI("%s: transcoder resumed", __FUNCTION__); return TranscodingErrorCode::kNoError; return AMEDIA_OK; } } void TranscoderWrapper::cleanup() { void TranscoderWrapper::cleanup() { Loading @@ -435,12 +471,10 @@ void TranscoderWrapper::cleanup() { } } void TranscoderWrapper::queueEvent(Event::Type type, ClientIdType clientId, JobIdType jobId, void TranscoderWrapper::queueEvent(Event::Type type, ClientIdType clientId, JobIdType jobId, const std::function<void()> runnable) { const std::function<void()> runnable, int32_t arg) { ALOGV("%s: job {%lld, %d}: %s", __FUNCTION__, (long long)clientId, jobId, toString(type)); std::scoped_lock lock{mLock}; std::scoped_lock lock{mLock}; mQueue.push_back({type, clientId, jobId, runnable}); mQueue.push_back({type, clientId, jobId, runnable, arg}); mCondition.notify_one(); mCondition.notify_one(); } } Loading @@ -457,8 +491,7 @@ void TranscoderWrapper::threadLoop() { Event event = *mQueue.begin(); Event event = *mQueue.begin(); mQueue.pop_front(); mQueue.pop_front(); ALOGD("%s: job {%lld, %d}: %s", __FUNCTION__, (long long)event.clientId, event.jobId, ALOGD("%s: %s", __FUNCTION__, toString(event).c_str()); toString(event.type)); lock.unlock(); lock.unlock(); event.runnable(); event.runnable(); Loading
media/libmediatranscoding/TranscodingJobScheduler.cpp +23 −5 Original line number Original line Diff line number Diff line Loading @@ -38,8 +38,13 @@ String8 TranscodingJobScheduler::jobToString(const JobKeyType& jobKey) { TranscodingJobScheduler::TranscodingJobScheduler( TranscodingJobScheduler::TranscodingJobScheduler( const std::shared_ptr<TranscoderInterface>& transcoder, const std::shared_ptr<TranscoderInterface>& transcoder, const std::shared_ptr<UidPolicyInterface>& uidPolicy) const std::shared_ptr<UidPolicyInterface>& uidPolicy, : mTranscoder(transcoder), mUidPolicy(uidPolicy), mCurrentJob(nullptr), mResourceLost(false) { const std::shared_ptr<ResourcePolicyInterface>& resourcePolicy) : mTranscoder(transcoder), mUidPolicy(uidPolicy), mResourcePolicy(resourcePolicy), mCurrentJob(nullptr), mResourceLost(false) { // Only push empty offline queue initially. Realtime queues are added when requests come in. // Only push empty offline queue initially. Realtime queues are added when requests come in. mUidSortedList.push_back(OFFLINE_UID); mUidSortedList.push_back(OFFLINE_UID); mOfflineUidIterator = mUidSortedList.begin(); mOfflineUidIterator = mUidSortedList.begin(); Loading Loading @@ -398,15 +403,24 @@ void TranscodingJobScheduler::onProgressUpdate(ClientIdType clientId, JobIdType } } void TranscodingJobScheduler::onResourceLost() { void TranscodingJobScheduler::onResourceLost() { ALOGV("%s", __FUNCTION__); ALOGI("%s", __FUNCTION__); std::scoped_lock lock{mLock}; std::scoped_lock lock{mLock}; if (mResourceLost) { return; } // If we receive a resource loss event, the TranscoderLibrary already paused // If we receive a resource loss event, the TranscoderLibrary already paused // the transcoding, so we don't need to call onPaused to notify it to pause. // the transcoding, so we don't need to call onPaused to notify it to pause. // Only need to update the job state here. // Only need to update the job state here. if (mCurrentJob != nullptr && mCurrentJob->state == Job::RUNNING) { if (mCurrentJob != nullptr && mCurrentJob->state == Job::RUNNING) { mCurrentJob->state = Job::PAUSED; mCurrentJob->state = Job::PAUSED; // Notify the client as a paused event. auto clientCallback = mCurrentJob->callback.lock(); if (clientCallback != nullptr) { clientCallback->onTranscodingPaused(mCurrentJob->key.second); } } } mResourceLost = true; mResourceLost = true; Loading Loading @@ -439,10 +453,14 @@ void TranscodingJobScheduler::onTopUidsChanged(const std::unordered_set<uid_t>& } } void TranscodingJobScheduler::onResourceAvailable() { void TranscodingJobScheduler::onResourceAvailable() { ALOGV("%s", __FUNCTION__); std::scoped_lock lock{mLock}; std::scoped_lock lock{mLock}; if (!mResourceLost) { return; } ALOGI("%s", __FUNCTION__); mResourceLost = false; mResourceLost = false; updateCurrentJob_l(); updateCurrentJob_l(); Loading
media/libmediatranscoding/TranscodingResourcePolicy.cpp 0 → 100644 +169 −0 Original line number Original line Diff line number Diff line /* * Copyright (C) 2020 The Android Open Source Project * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ //#define LOG_NDEBUG 0 #define LOG_TAG "TranscodingResourcePolicy" #include <aidl/android/media/BnResourceObserver.h> #include <aidl/android/media/IResourceObserverService.h> #include <android/binder_manager.h> #include <android/binder_process.h> #include <binder/IServiceManager.h> #include <media/TranscodingResourcePolicy.h> #include <utils/Log.h> namespace android { using Status = ::ndk::ScopedAStatus; using ::aidl::android::media::BnResourceObserver; using ::aidl::android::media::IResourceObserverService; using ::aidl::android::media::MediaObservableEvent; using ::aidl::android::media::MediaObservableFilter; using ::aidl::android::media::MediaObservableParcel; using ::aidl::android::media::MediaObservableType; static std::string toString(const MediaObservableParcel& observable) { return "{" + ::aidl::android::media::toString(observable.type) + ", " + std::to_string(observable.value) + "}"; } struct TranscodingResourcePolicy::ResourceObserver : public BnResourceObserver { explicit ResourceObserver(TranscodingResourcePolicy* owner) : mOwner(owner), mPid(getpid()) {} // IResourceObserver ::ndk::ScopedAStatus onStatusChanged( MediaObservableEvent event, int32_t uid, int32_t pid, const std::vector<MediaObservableParcel>& observables) override { ALOGD("%s: %s, uid %d, pid %d, %s", __FUNCTION__, ::aidl::android::media::toString(event).c_str(), uid, pid, toString(observables[0]).c_str()); // Only report kIdle event for codec resources from other processes. if (((uint64_t)event & (uint64_t)MediaObservableEvent::kIdle) != 0 && (pid != mPid)) { for (auto& observable : observables) { if (observable.type == MediaObservableType::kVideoSecureCodec || observable.type == MediaObservableType::kVideoNonSecureCodec) { mOwner->onResourceAvailable(); break; } } } return ::ndk::ScopedAStatus::ok(); } TranscodingResourcePolicy* mOwner; const pid_t mPid; }; // static void TranscodingResourcePolicy::BinderDiedCallback(void* cookie) { TranscodingResourcePolicy* owner = reinterpret_cast<TranscodingResourcePolicy*>(cookie); if (owner != nullptr) { owner->unregisterSelf(); } // TODO(chz): retry to connecting to IResourceObserverService after failure. // Also need to have back-up logic if IResourceObserverService is offline for // Prolonged period of time. A possible alternative could be, during period where // IResourceObserverService is not available, trigger onResourceAvailable() everytime // when top uid changes (in hope that'll free up some codec instances that we could // reclaim). } TranscodingResourcePolicy::TranscodingResourcePolicy() : mRegistered(false), mDeathRecipient(AIBinder_DeathRecipient_new(BinderDiedCallback)) { registerSelf(); } TranscodingResourcePolicy::~TranscodingResourcePolicy() { unregisterSelf(); } void TranscodingResourcePolicy::registerSelf() { ALOGI("TranscodingResourcePolicy: registerSelf"); ::ndk::SpAIBinder binder(AServiceManager_getService("media.resource_observer")); std::scoped_lock lock{mRegisteredLock}; if (mRegistered) { return; } // TODO(chz): retry to connecting to IResourceObserverService after failure. mService = IResourceObserverService::fromBinder(binder); if (mService == nullptr) { ALOGE("Failed to get IResourceObserverService"); return; } // Only register filters for codec resource available. mObserver = ::ndk::SharedRefBase::make<ResourceObserver>(this); std::vector<MediaObservableFilter> filters = { {MediaObservableType::kVideoSecureCodec, MediaObservableEvent::kIdle}, {MediaObservableType::kVideoNonSecureCodec, MediaObservableEvent::kIdle}}; Status status = mService->registerObserver(mObserver, filters); if (!status.isOk()) { ALOGE("failed to register: error %d", status.getServiceSpecificError()); mService = nullptr; mObserver = nullptr; return; } AIBinder_linkToDeath(binder.get(), mDeathRecipient.get(), reinterpret_cast<void*>(this)); ALOGD("@@@ registered observer"); mRegistered = true; } void TranscodingResourcePolicy::unregisterSelf() { ALOGI("TranscodingResourcePolicy: unregisterSelf"); std::scoped_lock lock{mRegisteredLock}; if (!mRegistered) { return; } ::ndk::SpAIBinder binder = mService->asBinder(); if (binder.get() != nullptr) { Status status = mService->unregisterObserver(mObserver); AIBinder_unlinkToDeath(binder.get(), mDeathRecipient.get(), reinterpret_cast<void*>(this)); } mService = nullptr; mObserver = nullptr; mRegistered = false; } void TranscodingResourcePolicy::setCallback( const std::shared_ptr<ResourcePolicyCallbackInterface>& cb) { std::scoped_lock lock{mCallbackLock}; mResourcePolicyCallback = cb; } void TranscodingResourcePolicy::onResourceAvailable() { std::shared_ptr<ResourcePolicyCallbackInterface> cb; { std::scoped_lock lock{mCallbackLock}; cb = mResourcePolicyCallback.lock(); } if (cb != nullptr) { cb->onResourceAvailable(); } } } // namespace android
media/libmediatranscoding/include/media/ResourcePolicyInterface.h 0 → 100644 +48 −0 Original line number Original line Diff line number Diff line /* * Copyright (C) 2020 The Android Open Source Project * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ #ifndef ANDROID_MEDIA_RESOURCE_POLICY_INTERFACE_H #define ANDROID_MEDIA_RESOURCE_POLICY_INTERFACE_H #include <memory> namespace android { class ResourcePolicyCallbackInterface; // Interface for the JobScheduler to control the resource status updates. class ResourcePolicyInterface { public: // Set the associated callback interface to send the events when resource // status changes. (Set to nullptr will stop the updates.) virtual void setCallback(const std::shared_ptr<ResourcePolicyCallbackInterface>& cb) = 0; protected: virtual ~ResourcePolicyInterface() = default; }; // Interface for notifying the JobScheduler of a change in resource status. class ResourcePolicyCallbackInterface { public: // Called when codec resources become available. The scheduler may use this // as a signal to attempt restart transcoding jobs that were previously // paused due to temporary resource loss. virtual void onResourceAvailable() = 0; protected: virtual ~ResourcePolicyCallbackInterface() = default; }; } // namespace android #endif // ANDROID_MEDIA_RESOURCE_POLICY_INTERFACE_H