Loading media/libmediatranscoding/TranscodingSessionController.cpp +92 −34 Original line number Diff line number Diff line Loading @@ -31,6 +31,7 @@ namespace android { static_assert((SessionIdType)-1 < 0, "SessionIdType should be signed"); constexpr static uid_t OFFLINE_UID = -1; constexpr static size_t kSessionHistoryMax = 100; //static String8 TranscodingSessionController::sessionToString(const SessionKeyType& sessionKey) { Loading @@ -47,6 +48,12 @@ const char* TranscodingSessionController::sessionStateToString(const Session::St return "RUNNING"; case Session::State::PAUSED: return "PAUSED"; case Session::State::FINISHED: return "FINISHED"; case Session::State::CANCELED: return "CANCELED"; case Session::State::ERROR: return "ERROR"; default: break; } Loading @@ -71,6 +78,30 @@ TranscodingSessionController::TranscodingSessionController( TranscodingSessionController::~TranscodingSessionController() {} void TranscodingSessionController::dumpSession_l(const Session& session, String8& result, bool closedSession) { const size_t SIZE = 256; char buffer[SIZE]; const TranscodingRequestParcel& request = session.request; snprintf(buffer, SIZE, " Session: %s, %s, %d%%\n", sessionToString(session.key).c_str(), sessionStateToString(session.getState()), session.lastProgress); result.append(buffer); snprintf(buffer, SIZE, " pkg: %s\n", request.clientPackageName.c_str()); result.append(buffer); snprintf(buffer, SIZE, " src: %s\n", request.sourceFilePath.c_str()); result.append(buffer); snprintf(buffer, SIZE, " dst: %s\n", request.destinationFilePath.c_str()); result.append(buffer); if (closedSession) { snprintf(buffer, SIZE, " waiting: %.1fs, running: %.1fs, paused: %.1fs, paused count: %d\n", session.waitingTime.count() / 1000000.0f, session.runningTime.count() / 1000000.0f, session.pausedTime.count() / 1000000.0f, session.pauseCount); result.append(buffer); } } void TranscodingSessionController::dumpAllSessions(int fd, const Vector<String16>& args __unused) { String8 result; Loading @@ -78,7 +109,7 @@ void TranscodingSessionController::dumpAllSessions(int fd, const Vector<String16 char buffer[SIZE]; std::scoped_lock lock{mLock}; snprintf(buffer, SIZE, "\n========== Dumping all sessions queues =========\n"); snprintf(buffer, SIZE, "\n========== Dumping live sessions queues =========\n"); result.append(buffer); snprintf(buffer, SIZE, " Total num of Sessions: %zu\n", mSessionMap.size()); result.append(buffer); Loading @@ -91,7 +122,7 @@ void TranscodingSessionController::dumpAllSessions(int fd, const Vector<String16 if (mSessionQueues[uid].empty()) { continue; } snprintf(buffer, SIZE, " Uid: %d, pkg: %s\n", uid, snprintf(buffer, SIZE, " uid: %d, pkg: %s\n", uid, mUidPackageNames.count(uid) > 0 ? mUidPackageNames[uid].c_str() : "(unknown)"); result.append(buffer); snprintf(buffer, SIZE, " Num of sessions: %zu\n", mSessionQueues[uid].size()); Loading @@ -104,23 +135,14 @@ void TranscodingSessionController::dumpAllSessions(int fd, const Vector<String16 result.append(buffer); continue; } Session& session = sessionIt->second; TranscodingRequestParcel& request = session.request; snprintf(buffer, SIZE, " Session: %s, %s, %d%%\n", sessionToString(sessionKey).c_str(), sessionStateToString(session.state), session.lastProgress); result.append(buffer); snprintf(buffer, SIZE, " Src: %s\n", request.sourceFilePath.c_str()); result.append(buffer); snprintf(buffer, SIZE, " Dst: %s\n", request.destinationFilePath.c_str()); result.append(buffer); // For the offline queue, print out the original client. if (uid == OFFLINE_UID) { snprintf(buffer, SIZE, " Original Client: %s\n", request.clientPackageName.c_str()); result.append(buffer); dumpSession_l(sessionIt->second, result); } } snprintf(buffer, SIZE, "\n========== Dumping past sessions =========\n"); result.append(buffer); for (auto &session : mSessionHistory) { dumpSession_l(session, result, true /*closedSession*/); } write(fd, result.string(), result.size()); Loading @@ -135,6 +157,34 @@ TranscodingSessionController::Session* TranscodingSessionController::getTopSessi return &mSessionMap[topSessionKey]; } void TranscodingSessionController::Session::setState(Session::State newState) { if (state == newState) { return; } auto nowTime = std::chrono::system_clock::now(); if (state != INVALID) { std::chrono::microseconds elapsedTime = (nowTime - stateEnterTime); switch (state) { case PAUSED: pausedTime = pausedTime + elapsedTime; break; case RUNNING: runningTime = runningTime + elapsedTime; break; case NOT_STARTED: waitingTime = waitingTime + elapsedTime; break; default: break; } } if (newState == PAUSED) { pauseCount++; } stateEnterTime = nowTime; state = newState; } void TranscodingSessionController::updateCurrentSession_l() { Session* topSession = getTopSession_l(); Session* curSession = mCurrentSession; Loading @@ -145,29 +195,30 @@ void TranscodingSessionController::updateCurrentSession_l() { // If we found a topSession that should be run, and it's not already running, // take some actions to ensure it's running. if (topSession != nullptr && (topSession != curSession || topSession->state != Session::RUNNING)) { (topSession != curSession || topSession->getState() != Session::RUNNING)) { // If another session is currently running, pause it first. if (curSession != nullptr && curSession->state == Session::RUNNING) { if (curSession != nullptr && curSession->getState() == Session::RUNNING) { mTranscoder->pause(curSession->key.first, curSession->key.second); curSession->state = Session::PAUSED; curSession->setState(Session::PAUSED); } // If we are not experiencing resource loss, we can start or resume // the topSession now. if (!mResourceLost) { if (topSession->state == Session::NOT_STARTED) { if (topSession->getState() == Session::NOT_STARTED) { mTranscoder->start(topSession->key.first, topSession->key.second, topSession->request, topSession->callback.lock()); } else if (topSession->state == Session::PAUSED) { } else if (topSession->getState() == Session::PAUSED) { mTranscoder->resume(topSession->key.first, topSession->key.second, topSession->request, topSession->callback.lock()); } topSession->state = Session::RUNNING; topSession->setState(Session::RUNNING); } } mCurrentSession = topSession; } void TranscodingSessionController::removeSession_l(const SessionKeyType& sessionKey) { void TranscodingSessionController::removeSession_l(const SessionKeyType& sessionKey, Session::State finalState) { ALOGV("%s: session %s", __FUNCTION__, sessionToString(sessionKey).c_str()); if (mSessionMap.count(sessionKey) == 0) { Loading Loading @@ -201,6 +252,12 @@ void TranscodingSessionController::removeSession_l(const SessionKeyType& session mCurrentSession = nullptr; } mSessionMap[sessionKey].setState(finalState); mSessionHistory.push_back(mSessionMap[sessionKey]); if (mSessionHistory.size() > kSessionHistoryMax) { mSessionHistory.erase(mSessionHistory.begin()); } // Remove session from session map. mSessionMap.erase(sessionKey); } Loading Loading @@ -288,10 +345,11 @@ bool TranscodingSessionController::submit( // Add session to session map. mSessionMap[sessionKey].key = sessionKey; mSessionMap[sessionKey].uid = uid; mSessionMap[sessionKey].state = Session::NOT_STARTED; mSessionMap[sessionKey].lastProgress = 0; mSessionMap[sessionKey].pauseCount = 0; mSessionMap[sessionKey].request = request; mSessionMap[sessionKey].callback = callback; mSessionMap[sessionKey].setState(Session::NOT_STARTED); // If it's an offline session, the queue was already added in constructor. // If it's a real-time sessions, check if a queue is already present for the uid, Loading Loading @@ -350,12 +408,12 @@ bool TranscodingSessionController::cancel(ClientIdType clientId, SessionIdType s // Note that stop() is needed even if the session is currently paused. This instructs // the transcoder to discard any states for the session, otherwise the states may // never be discarded. if (mSessionMap[*it].state != Session::NOT_STARTED) { if (mSessionMap[*it].getState() != Session::NOT_STARTED) { mTranscoder->stop(it->first, it->second); } // Remove the session. removeSession_l(*it); removeSession_l(*it, Session::CANCELED); } // Start next session. Loading Loading @@ -396,7 +454,7 @@ void TranscodingSessionController::notifyClient(ClientIdType clientId, SessionId // Only ignore if session was never started. In particular, propagate the status // to client if the session is paused. Transcoder could have posted finish when // we're pausing it, and the finish arrived after we changed current session. if (mSessionMap[sessionKey].state == Session::NOT_STARTED) { if (mSessionMap[sessionKey].getState() == Session::NOT_STARTED) { ALOGW("%s: ignoring %s for session %s that was never started", __FUNCTION__, reason, sessionToString(sessionKey).c_str()); return; Loading Loading @@ -445,7 +503,7 @@ void TranscodingSessionController::onFinish(ClientIdType clientId, SessionIdType } // Remove the session. removeSession_l(sessionKey); removeSession_l(sessionKey, Session::FINISHED); // Start next session. updateCurrentSession_l(); Loading @@ -465,7 +523,7 @@ void TranscodingSessionController::onError(ClientIdType clientId, SessionIdType } // Remove the session. removeSession_l(sessionKey); removeSession_l(sessionKey, Session::ERROR); // Start next session. updateCurrentSession_l(); Loading Loading @@ -494,7 +552,7 @@ void TranscodingSessionController::onResourceLost(ClientIdType clientId, Session } Session* resourceLostSession = &mSessionMap[sessionKey]; if (resourceLostSession->state != Session::RUNNING) { if (resourceLostSession->getState() != Session::RUNNING) { ALOGW("session %s lost resource but is no longer running", sessionToString(sessionKey).c_str()); return; Loading @@ -502,7 +560,7 @@ void TranscodingSessionController::onResourceLost(ClientIdType clientId, Session // If we receive a resource loss event, the transcoder already paused the transcoding, // so we don't need to call onPaused() to pause it. However, we still need to notify // the client and update the session state here. resourceLostSession->state = Session::PAUSED; resourceLostSession->setState(Session::PAUSED); // Notify the client as a paused event. auto clientCallback = resourceLostSession->callback.lock(); if (clientCallback != nullptr) { Loading media/libmediatranscoding/include/media/TranscodingSessionController.h +25 −5 Original line number Diff line number Diff line Loading @@ -26,6 +26,7 @@ #include <utils/String8.h> #include <utils/Vector.h> #include <chrono> #include <list> #include <map> #include <mutex> Loading Loading @@ -82,16 +83,33 @@ private: using SessionQueueType = std::list<SessionKeyType>; struct Session { SessionKeyType key; uid_t uid; enum State { NOT_STARTED, INVALID = -1, NOT_STARTED = 0, RUNNING, PAUSED, } state; FINISHED, CANCELED, ERROR, }; SessionKeyType key; uid_t uid; int32_t lastProgress; int32_t pauseCount; std::chrono::time_point<std::chrono::system_clock> stateEnterTime; std::chrono::microseconds waitingTime; std::chrono::microseconds runningTime; std::chrono::microseconds pausedTime; TranscodingRequest request; std::weak_ptr<ITranscodingClientCallback> callback; // Must use setState to change state. void setState(Session::State state); State getState() const { return state; } private: State state = INVALID; }; // TODO(chz): call transcoder without global lock. Loading @@ -115,15 +133,17 @@ private: Session* mCurrentSession; bool mResourceLost; std::list<Session> mSessionHistory; // Only allow MediaTranscodingService and unit tests to instantiate. TranscodingSessionController(const std::shared_ptr<TranscoderInterface>& transcoder, const std::shared_ptr<UidPolicyInterface>& uidPolicy, const std::shared_ptr<ResourcePolicyInterface>& resourcePolicy); void dumpSession_l(const Session& session, String8& result, bool closedSession = false); Session* getTopSession_l(); void updateCurrentSession_l(); void removeSession_l(const SessionKeyType& sessionKey); void removeSession_l(const SessionKeyType& sessionKey, Session::State finalState); void moveUidsToTop_l(const std::unordered_set<uid_t>& uids, bool preserveTopUid); void notifyClient(ClientIdType clientId, SessionIdType sessionId, const char* reason, std::function<void(const SessionKeyType&)> func); Loading Loading
media/libmediatranscoding/TranscodingSessionController.cpp +92 −34 Original line number Diff line number Diff line Loading @@ -31,6 +31,7 @@ namespace android { static_assert((SessionIdType)-1 < 0, "SessionIdType should be signed"); constexpr static uid_t OFFLINE_UID = -1; constexpr static size_t kSessionHistoryMax = 100; //static String8 TranscodingSessionController::sessionToString(const SessionKeyType& sessionKey) { Loading @@ -47,6 +48,12 @@ const char* TranscodingSessionController::sessionStateToString(const Session::St return "RUNNING"; case Session::State::PAUSED: return "PAUSED"; case Session::State::FINISHED: return "FINISHED"; case Session::State::CANCELED: return "CANCELED"; case Session::State::ERROR: return "ERROR"; default: break; } Loading @@ -71,6 +78,30 @@ TranscodingSessionController::TranscodingSessionController( TranscodingSessionController::~TranscodingSessionController() {} void TranscodingSessionController::dumpSession_l(const Session& session, String8& result, bool closedSession) { const size_t SIZE = 256; char buffer[SIZE]; const TranscodingRequestParcel& request = session.request; snprintf(buffer, SIZE, " Session: %s, %s, %d%%\n", sessionToString(session.key).c_str(), sessionStateToString(session.getState()), session.lastProgress); result.append(buffer); snprintf(buffer, SIZE, " pkg: %s\n", request.clientPackageName.c_str()); result.append(buffer); snprintf(buffer, SIZE, " src: %s\n", request.sourceFilePath.c_str()); result.append(buffer); snprintf(buffer, SIZE, " dst: %s\n", request.destinationFilePath.c_str()); result.append(buffer); if (closedSession) { snprintf(buffer, SIZE, " waiting: %.1fs, running: %.1fs, paused: %.1fs, paused count: %d\n", session.waitingTime.count() / 1000000.0f, session.runningTime.count() / 1000000.0f, session.pausedTime.count() / 1000000.0f, session.pauseCount); result.append(buffer); } } void TranscodingSessionController::dumpAllSessions(int fd, const Vector<String16>& args __unused) { String8 result; Loading @@ -78,7 +109,7 @@ void TranscodingSessionController::dumpAllSessions(int fd, const Vector<String16 char buffer[SIZE]; std::scoped_lock lock{mLock}; snprintf(buffer, SIZE, "\n========== Dumping all sessions queues =========\n"); snprintf(buffer, SIZE, "\n========== Dumping live sessions queues =========\n"); result.append(buffer); snprintf(buffer, SIZE, " Total num of Sessions: %zu\n", mSessionMap.size()); result.append(buffer); Loading @@ -91,7 +122,7 @@ void TranscodingSessionController::dumpAllSessions(int fd, const Vector<String16 if (mSessionQueues[uid].empty()) { continue; } snprintf(buffer, SIZE, " Uid: %d, pkg: %s\n", uid, snprintf(buffer, SIZE, " uid: %d, pkg: %s\n", uid, mUidPackageNames.count(uid) > 0 ? mUidPackageNames[uid].c_str() : "(unknown)"); result.append(buffer); snprintf(buffer, SIZE, " Num of sessions: %zu\n", mSessionQueues[uid].size()); Loading @@ -104,23 +135,14 @@ void TranscodingSessionController::dumpAllSessions(int fd, const Vector<String16 result.append(buffer); continue; } Session& session = sessionIt->second; TranscodingRequestParcel& request = session.request; snprintf(buffer, SIZE, " Session: %s, %s, %d%%\n", sessionToString(sessionKey).c_str(), sessionStateToString(session.state), session.lastProgress); result.append(buffer); snprintf(buffer, SIZE, " Src: %s\n", request.sourceFilePath.c_str()); result.append(buffer); snprintf(buffer, SIZE, " Dst: %s\n", request.destinationFilePath.c_str()); result.append(buffer); // For the offline queue, print out the original client. if (uid == OFFLINE_UID) { snprintf(buffer, SIZE, " Original Client: %s\n", request.clientPackageName.c_str()); result.append(buffer); dumpSession_l(sessionIt->second, result); } } snprintf(buffer, SIZE, "\n========== Dumping past sessions =========\n"); result.append(buffer); for (auto &session : mSessionHistory) { dumpSession_l(session, result, true /*closedSession*/); } write(fd, result.string(), result.size()); Loading @@ -135,6 +157,34 @@ TranscodingSessionController::Session* TranscodingSessionController::getTopSessi return &mSessionMap[topSessionKey]; } void TranscodingSessionController::Session::setState(Session::State newState) { if (state == newState) { return; } auto nowTime = std::chrono::system_clock::now(); if (state != INVALID) { std::chrono::microseconds elapsedTime = (nowTime - stateEnterTime); switch (state) { case PAUSED: pausedTime = pausedTime + elapsedTime; break; case RUNNING: runningTime = runningTime + elapsedTime; break; case NOT_STARTED: waitingTime = waitingTime + elapsedTime; break; default: break; } } if (newState == PAUSED) { pauseCount++; } stateEnterTime = nowTime; state = newState; } void TranscodingSessionController::updateCurrentSession_l() { Session* topSession = getTopSession_l(); Session* curSession = mCurrentSession; Loading @@ -145,29 +195,30 @@ void TranscodingSessionController::updateCurrentSession_l() { // If we found a topSession that should be run, and it's not already running, // take some actions to ensure it's running. if (topSession != nullptr && (topSession != curSession || topSession->state != Session::RUNNING)) { (topSession != curSession || topSession->getState() != Session::RUNNING)) { // If another session is currently running, pause it first. if (curSession != nullptr && curSession->state == Session::RUNNING) { if (curSession != nullptr && curSession->getState() == Session::RUNNING) { mTranscoder->pause(curSession->key.first, curSession->key.second); curSession->state = Session::PAUSED; curSession->setState(Session::PAUSED); } // If we are not experiencing resource loss, we can start or resume // the topSession now. if (!mResourceLost) { if (topSession->state == Session::NOT_STARTED) { if (topSession->getState() == Session::NOT_STARTED) { mTranscoder->start(topSession->key.first, topSession->key.second, topSession->request, topSession->callback.lock()); } else if (topSession->state == Session::PAUSED) { } else if (topSession->getState() == Session::PAUSED) { mTranscoder->resume(topSession->key.first, topSession->key.second, topSession->request, topSession->callback.lock()); } topSession->state = Session::RUNNING; topSession->setState(Session::RUNNING); } } mCurrentSession = topSession; } void TranscodingSessionController::removeSession_l(const SessionKeyType& sessionKey) { void TranscodingSessionController::removeSession_l(const SessionKeyType& sessionKey, Session::State finalState) { ALOGV("%s: session %s", __FUNCTION__, sessionToString(sessionKey).c_str()); if (mSessionMap.count(sessionKey) == 0) { Loading Loading @@ -201,6 +252,12 @@ void TranscodingSessionController::removeSession_l(const SessionKeyType& session mCurrentSession = nullptr; } mSessionMap[sessionKey].setState(finalState); mSessionHistory.push_back(mSessionMap[sessionKey]); if (mSessionHistory.size() > kSessionHistoryMax) { mSessionHistory.erase(mSessionHistory.begin()); } // Remove session from session map. mSessionMap.erase(sessionKey); } Loading Loading @@ -288,10 +345,11 @@ bool TranscodingSessionController::submit( // Add session to session map. mSessionMap[sessionKey].key = sessionKey; mSessionMap[sessionKey].uid = uid; mSessionMap[sessionKey].state = Session::NOT_STARTED; mSessionMap[sessionKey].lastProgress = 0; mSessionMap[sessionKey].pauseCount = 0; mSessionMap[sessionKey].request = request; mSessionMap[sessionKey].callback = callback; mSessionMap[sessionKey].setState(Session::NOT_STARTED); // If it's an offline session, the queue was already added in constructor. // If it's a real-time sessions, check if a queue is already present for the uid, Loading Loading @@ -350,12 +408,12 @@ bool TranscodingSessionController::cancel(ClientIdType clientId, SessionIdType s // Note that stop() is needed even if the session is currently paused. This instructs // the transcoder to discard any states for the session, otherwise the states may // never be discarded. if (mSessionMap[*it].state != Session::NOT_STARTED) { if (mSessionMap[*it].getState() != Session::NOT_STARTED) { mTranscoder->stop(it->first, it->second); } // Remove the session. removeSession_l(*it); removeSession_l(*it, Session::CANCELED); } // Start next session. Loading Loading @@ -396,7 +454,7 @@ void TranscodingSessionController::notifyClient(ClientIdType clientId, SessionId // Only ignore if session was never started. In particular, propagate the status // to client if the session is paused. Transcoder could have posted finish when // we're pausing it, and the finish arrived after we changed current session. if (mSessionMap[sessionKey].state == Session::NOT_STARTED) { if (mSessionMap[sessionKey].getState() == Session::NOT_STARTED) { ALOGW("%s: ignoring %s for session %s that was never started", __FUNCTION__, reason, sessionToString(sessionKey).c_str()); return; Loading Loading @@ -445,7 +503,7 @@ void TranscodingSessionController::onFinish(ClientIdType clientId, SessionIdType } // Remove the session. removeSession_l(sessionKey); removeSession_l(sessionKey, Session::FINISHED); // Start next session. updateCurrentSession_l(); Loading @@ -465,7 +523,7 @@ void TranscodingSessionController::onError(ClientIdType clientId, SessionIdType } // Remove the session. removeSession_l(sessionKey); removeSession_l(sessionKey, Session::ERROR); // Start next session. updateCurrentSession_l(); Loading Loading @@ -494,7 +552,7 @@ void TranscodingSessionController::onResourceLost(ClientIdType clientId, Session } Session* resourceLostSession = &mSessionMap[sessionKey]; if (resourceLostSession->state != Session::RUNNING) { if (resourceLostSession->getState() != Session::RUNNING) { ALOGW("session %s lost resource but is no longer running", sessionToString(sessionKey).c_str()); return; Loading @@ -502,7 +560,7 @@ void TranscodingSessionController::onResourceLost(ClientIdType clientId, Session // If we receive a resource loss event, the transcoder already paused the transcoding, // so we don't need to call onPaused() to pause it. However, we still need to notify // the client and update the session state here. resourceLostSession->state = Session::PAUSED; resourceLostSession->setState(Session::PAUSED); // Notify the client as a paused event. auto clientCallback = resourceLostSession->callback.lock(); if (clientCallback != nullptr) { Loading
media/libmediatranscoding/include/media/TranscodingSessionController.h +25 −5 Original line number Diff line number Diff line Loading @@ -26,6 +26,7 @@ #include <utils/String8.h> #include <utils/Vector.h> #include <chrono> #include <list> #include <map> #include <mutex> Loading Loading @@ -82,16 +83,33 @@ private: using SessionQueueType = std::list<SessionKeyType>; struct Session { SessionKeyType key; uid_t uid; enum State { NOT_STARTED, INVALID = -1, NOT_STARTED = 0, RUNNING, PAUSED, } state; FINISHED, CANCELED, ERROR, }; SessionKeyType key; uid_t uid; int32_t lastProgress; int32_t pauseCount; std::chrono::time_point<std::chrono::system_clock> stateEnterTime; std::chrono::microseconds waitingTime; std::chrono::microseconds runningTime; std::chrono::microseconds pausedTime; TranscodingRequest request; std::weak_ptr<ITranscodingClientCallback> callback; // Must use setState to change state. void setState(Session::State state); State getState() const { return state; } private: State state = INVALID; }; // TODO(chz): call transcoder without global lock. Loading @@ -115,15 +133,17 @@ private: Session* mCurrentSession; bool mResourceLost; std::list<Session> mSessionHistory; // Only allow MediaTranscodingService and unit tests to instantiate. TranscodingSessionController(const std::shared_ptr<TranscoderInterface>& transcoder, const std::shared_ptr<UidPolicyInterface>& uidPolicy, const std::shared_ptr<ResourcePolicyInterface>& resourcePolicy); void dumpSession_l(const Session& session, String8& result, bool closedSession = false); Session* getTopSession_l(); void updateCurrentSession_l(); void removeSession_l(const SessionKeyType& sessionKey); void removeSession_l(const SessionKeyType& sessionKey, Session::State finalState); void moveUidsToTop_l(const std::unordered_set<uid_t>& uids, bool preserveTopUid); void notifyClient(ClientIdType clientId, SessionIdType sessionId, const char* reason, std::function<void(const SessionKeyType&)> func); Loading