Loading media/libstagefright/httplive/LiveSession.cpp +250 −30 Original line number Diff line number Diff line Loading @@ -40,6 +40,8 @@ #include <media/stagefright/MetaData.h> #include <media/stagefright/Utils.h> #include <utils/Mutex.h> #include <ctype.h> #include <openssl/aes.h> #include <openssl/md5.h> Loading @@ -56,10 +58,14 @@ LiveSession::LiveSession( mHTTPDataSource(new MediaHTTP(mHTTPService->makeHTTPConnection())), mPrevBandwidthIndex(-1), mStreamMask(0), mNewStreamMask(0), mSwapMask(0), mCheckBandwidthGeneration(0), mSwitchGeneration(0), mLastDequeuedTimeUs(0ll), mRealTimeBaseUs(0ll), mReconfigurationInProgress(false), mSwitchInProgress(false), mDisconnectReplyID(0) { mStreams[kAudioIndex] = StreamItem("audio"); Loading @@ -68,16 +74,37 @@ LiveSession::LiveSession( for (size_t i = 0; i < kMaxStreams; ++i) { mPacketSources.add(indexToType(i), new AnotherPacketSource(NULL /* meta */)); mPacketSources2.add(indexToType(i), new AnotherPacketSource(NULL /* meta */)); } } LiveSession::~LiveSession() { } sp<ABuffer> LiveSession::createFormatChangeBuffer(bool swap) { ABuffer *discontinuity = new ABuffer(0); discontinuity->meta()->setInt32("discontinuity", ATSParser::DISCONTINUITY_FORMATCHANGE); discontinuity->meta()->setInt32("swapPacketSource", swap); discontinuity->meta()->setInt32("switchGeneration", mSwitchGeneration); discontinuity->meta()->setInt64("timeUs", -1); return discontinuity; } void LiveSession::swapPacketSource(StreamType stream) { sp<AnotherPacketSource> &aps = mPacketSources.editValueFor(stream); sp<AnotherPacketSource> &aps2 = mPacketSources2.editValueFor(stream); sp<AnotherPacketSource> tmp = aps; aps = aps2; aps2 = tmp; aps2->clear(); } status_t LiveSession::dequeueAccessUnit( StreamType stream, sp<ABuffer> *accessUnit) { if (!(mStreamMask & stream)) { return UNKNOWN_ERROR; // return -EWOULDBLOCK to avoid halting the decoder // when switching between audio/video and audio only. return -EWOULDBLOCK; } sp<AnotherPacketSource> packetSource = mPacketSources.valueFor(stream); Loading Loading @@ -117,6 +144,25 @@ status_t LiveSession::dequeueAccessUnit( streamStr, type, extra == NULL ? "NULL" : extra->debugString().c_str()); int32_t swap; if (type == ATSParser::DISCONTINUITY_FORMATCHANGE && (*accessUnit)->meta()->findInt32("swapPacketSource", &swap) && swap) { int32_t switchGeneration; CHECK((*accessUnit)->meta()->findInt32("switchGeneration", &switchGeneration)); { Mutex::Autolock lock(mSwapMutex); if (switchGeneration == mSwitchGeneration) { swapPacketSource(stream); sp<AMessage> msg = new AMessage(kWhatSwapped, id()); msg->setInt32("stream", stream); msg->setInt32("switchGeneration", switchGeneration); msg->post(); } } } } else if (err == OK) { if (stream == STREAMTYPE_AUDIO || stream == STREAMTYPE_VIDEO) { int64_t timeUs; Loading @@ -138,6 +184,7 @@ status_t LiveSession::dequeueAccessUnit( } status_t LiveSession::getStreamFormat(StreamType stream, sp<AMessage> *format) { // No swapPacketSource race condition; called from the same thread as dequeueAccessUnit. if (!(mStreamMask & stream)) { return UNKNOWN_ERROR; } Loading Loading @@ -234,7 +281,12 @@ void LiveSession::onMessageReceived(const sp<AMessage> &msg) { if (what == PlaylistFetcher::kWhatStopped) { AString uri; CHECK(msg->findString("uri", &uri)); mFetcherInfos.removeItem(uri); if (mFetcherInfos.removeItem(uri) < 0) { // ignore duplicated kWhatStopped messages. break; } tryToFinishBandwidthSwitch(); } if (mContinuation != NULL) { Loading Loading @@ -270,6 +322,8 @@ void LiveSession::onMessageReceived(const sp<AMessage> &msg) { postPrepared(err); } cancelBandwidthSwitch(); mPacketSources.valueFor(STREAMTYPE_AUDIO)->signalEOS(err); mPacketSources.valueFor(STREAMTYPE_VIDEO)->signalEOS(err); Loading Loading @@ -308,6 +362,27 @@ void LiveSession::onMessageReceived(const sp<AMessage> &msg) { break; } case PlaylistFetcher::kWhatStartedAt: { int32_t switchGeneration; CHECK(msg->findInt32("switchGeneration", &switchGeneration)); if (switchGeneration != mSwitchGeneration) { break; } // Resume fetcher for the original variant; the resumed fetcher should // continue until the timestamps found in msg, which is stored by the // new fetcher to indicate where the new variant has started buffering. for (size_t i = 0; i < mFetcherInfos.size(); i++) { const FetcherInfo info = mFetcherInfos.valueAt(i); if (info.mToBeRemoved) { info.mFetcher->resumeUntilAsync(msg); } } break; } default: TRESPASS(); } Loading Loading @@ -352,6 +427,11 @@ void LiveSession::onMessageReceived(const sp<AMessage> &msg) { break; } case kWhatSwapped: { onSwapped(msg); break; } default: TRESPASS(); break; Loading Loading @@ -462,6 +542,10 @@ void LiveSession::finishDisconnect() { // during disconnection either. cancelCheckBandwidthEvent(); // Protect mPacketSources from a swapPacketSource race condition through disconnect. // (finishDisconnect, onFinishDisconnect2) cancelBandwidthSwitch(); for (size_t i = 0; i < mFetcherInfos.size(); ++i) { mFetcherInfos.valueAt(i).mFetcher->stopAsync(); } Loading Loading @@ -501,11 +585,13 @@ sp<PlaylistFetcher> LiveSession::addFetcher(const char *uri) { sp<AMessage> notify = new AMessage(kWhatFetcherNotify, id()); notify->setString("uri", uri); notify->setInt32("switchGeneration", mSwitchGeneration); FetcherInfo info; info.mFetcher = new PlaylistFetcher(notify, this, uri); info.mDurationUs = -1ll; info.mIsPrepared = false; info.mToBeRemoved = false; looper()->registerHandler(info.mFetcher); mFetcherInfos.add(uri, info); Loading Loading @@ -845,8 +931,25 @@ status_t LiveSession::selectTrack(size_t index, bool select) { return err; } bool LiveSession::canSwitchUp() { // Allow upwards bandwidth switch when a stream has buffered at least 10 seconds. status_t err = OK; for (size_t i = 0; i < mPacketSources.size(); ++i) { sp<AnotherPacketSource> source = mPacketSources.valueAt(i); int64_t dur = source->getBufferedDurationUs(&err); if (err == OK && dur > 10000000) { return true; } } return false; } void LiveSession::changeConfiguration( int64_t timeUs, size_t bandwidthIndex, bool pickTrack) { // Protect mPacketSources from a swapPacketSource race condition through reconfiguration. // (changeConfiguration, onChangeConfiguration2, onChangeConfiguration3). cancelBandwidthSwitch(); CHECK(!mReconfigurationInProgress); mReconfigurationInProgress = true; Loading @@ -862,7 +965,8 @@ void LiveSession::changeConfiguration( CHECK_LT(bandwidthIndex, mBandwidthItems.size()); const BandwidthItem &item = mBandwidthItems.itemAt(bandwidthIndex); uint32_t streamMask = 0; uint32_t streamMask = 0; // streams that should be fetched by the new fetcher uint32_t resumeMask = 0; // streams that should be fetched by the original fetcher AString URIs[kMaxStreams]; for (size_t i = 0; i < kMaxStreams; ++i) { Loading @@ -880,9 +984,14 @@ void LiveSession::changeConfiguration( // If we're seeking all current fetchers are discarded. if (timeUs < 0ll) { for (size_t j = 0; j < kMaxStreams; ++j) { if ((streamMask & indexToType(j)) && uri == URIs[j]) { // delay fetcher removal discardFetcher = false; for (size_t j = 0; j < kMaxStreams; ++j) { StreamType type = indexToType(j); if ((streamMask & type) && uri == URIs[j]) { resumeMask |= type; streamMask &= ~type; } } } Loading @@ -894,8 +1003,15 @@ void LiveSession::changeConfiguration( } } sp<AMessage> msg = new AMessage(kWhatChangeConfiguration2, id()); sp<AMessage> msg; if (timeUs < 0ll) { // skip onChangeConfiguration2 (decoder destruction) if switching. msg = new AMessage(kWhatChangeConfiguration3, id()); } else { msg = new AMessage(kWhatChangeConfiguration2, id()); } msg->setInt32("streamMask", streamMask); msg->setInt32("resumeMask", resumeMask); msg->setInt64("timeUs", timeUs); for (size_t i = 0; i < kMaxStreams; ++i) { if (streamMask & indexToType(i)) { Loading Loading @@ -978,11 +1094,13 @@ void LiveSession::onChangeConfiguration2(const sp<AMessage> &msg) { } void LiveSession::onChangeConfiguration3(const sp<AMessage> &msg) { mContinuation.clear(); // All remaining fetchers are still suspended, the player has shutdown // any decoders that needed it. uint32_t streamMask; uint32_t streamMask, resumeMask; CHECK(msg->findInt32("streamMask", (int32_t *)&streamMask)); CHECK(msg->findInt32("resumeMask", (int32_t *)&resumeMask)); for (size_t i = 0; i < kMaxStreams; ++i) { if (streamMask & indexToType(i)) { Loading @@ -991,37 +1109,39 @@ void LiveSession::onChangeConfiguration3(const sp<AMessage> &msg) { } int64_t timeUs; bool switching = false; CHECK(msg->findInt64("timeUs", &timeUs)); if (timeUs < 0ll) { timeUs = mLastDequeuedTimeUs; switching = true; } mRealTimeBaseUs = ALooper::GetNowUs() - timeUs; mStreamMask = streamMask; mNewStreamMask = streamMask; // Resume all existing fetchers and assign them packet sources. // Of all existing fetchers: // * Resume fetchers that are still needed and assign them original packet sources. // * Mark otherwise unneeded fetchers for removal. ALOGV("resuming fetchers for mask 0x%08x", resumeMask); for (size_t i = 0; i < mFetcherInfos.size(); ++i) { const AString &uri = mFetcherInfos.keyAt(i); uint32_t resumeMask = 0; sp<AnotherPacketSource> sources[kMaxStreams]; for (size_t j = 0; j < kMaxStreams; ++j) { if ((streamMask & indexToType(j)) && uri == mStreams[j].mUri) { if ((resumeMask & indexToType(j)) && uri == mStreams[j].mUri) { sources[j] = mPacketSources.valueFor(indexToType(j)); resumeMask |= indexToType(j); } } CHECK_NE(resumeMask, 0u); ALOGV("resuming fetchers for mask 0x%08x", resumeMask); streamMask &= ~resumeMask; mFetcherInfos.valueAt(i).mFetcher->startAsync( FetcherInfo &info = mFetcherInfos.editValueAt(i); if (sources[kAudioIndex] != NULL || sources[kVideoIndex] != NULL || sources[kSubtitleIndex] != NULL) { info.mFetcher->startAsync( sources[kAudioIndex], sources[kVideoIndex], sources[kSubtitleIndex]); } else { info.mToBeRemoved = true; } } // streamMask now only contains the types that need a new fetcher created. Loading @@ -1030,6 +1150,8 @@ void LiveSession::onChangeConfiguration3(const sp<AMessage> &msg) { ALOGV("creating new fetchers for mask 0x%08x", streamMask); } // Find out when the original fetchers have buffered up to and start the new fetchers // at a later timestamp. for (size_t i = 0; i < kMaxStreams; i++) { if (!(indexToType(i) & streamMask)) { continue; Loading @@ -1041,12 +1163,40 @@ void LiveSession::onChangeConfiguration3(const sp<AMessage> &msg) { sp<PlaylistFetcher> fetcher = addFetcher(uri.c_str()); CHECK(fetcher != NULL); int32_t latestSeq = -1; int64_t latestTimeUs = 0ll; sp<AnotherPacketSource> sources[kMaxStreams]; // TRICKY: looping from i as earlier streams are already removed from streamMask for (size_t j = i; j < kMaxStreams; ++j) { if ((streamMask & indexToType(j)) && uri == mStreams[j].mUri) { sources[j] = mPacketSources.valueFor(indexToType(j)); if (!switching) { sources[j]->clear(); } else { int32_t type, seq; int64_t srcTimeUs; sp<AMessage> meta = sources[j]->getLatestMeta(); if (meta != NULL && !meta->findInt32("discontinuity", &type)) { CHECK(meta->findInt32("seq", &seq)); if (seq > latestSeq) { latestSeq = seq; } CHECK(meta->findInt64("timeUs", &srcTimeUs)); if (srcTimeUs > latestTimeUs) { latestTimeUs = srcTimeUs; } } sources[j] = mPacketSources2.valueFor(indexToType(j)); sources[j]->clear(); uint32_t extraStreams = mNewStreamMask & (~mStreamMask); if (extraStreams & indexToType(j)) { sources[j]->queueAccessUnit(createFormatChangeBuffer(/* swap = */ false)); } } streamMask &= ~indexToType(j); } Loading @@ -1056,7 +1206,9 @@ void LiveSession::onChangeConfiguration3(const sp<AMessage> &msg) { sources[kAudioIndex], sources[kVideoIndex], sources[kSubtitleIndex], timeUs); timeUs, latestTimeUs /* min start time(us) */, latestSeq >= 0 ? latestSeq + 1 : -1 /* starting sequence number hint */ ); } // All fetchers have now been started, the configuration change Loading @@ -1065,14 +1217,61 @@ void LiveSession::onChangeConfiguration3(const sp<AMessage> &msg) { scheduleCheckBandwidthEvent(); ALOGV("XXX configuration change completed."); mReconfigurationInProgress = false; if (switching) { mSwitchInProgress = true; } else { mStreamMask = mNewStreamMask; } if (mDisconnectReplyID != 0) { finishDisconnect(); } } void LiveSession::onSwapped(const sp<AMessage> &msg) { int32_t switchGeneration; CHECK(msg->findInt32("switchGeneration", &switchGeneration)); if (switchGeneration != mSwitchGeneration) { return; } int32_t stream; CHECK(msg->findInt32("stream", &stream)); mSwapMask |= stream; if (mSwapMask != mStreamMask) { return; } // Check if new variant contains extra streams. uint32_t extraStreams = mNewStreamMask & (~mStreamMask); while (extraStreams) { StreamType extraStream = (StreamType) (extraStreams & ~(extraStreams - 1)); swapPacketSource(extraStream); extraStreams &= ~extraStream; } tryToFinishBandwidthSwitch(); } // Mark switch done when: // 1. all old buffers are swapped out, AND // 2. all old fetchers are removed. void LiveSession::tryToFinishBandwidthSwitch() { bool needToRemoveFetchers = false; for (size_t i = 0; i < mFetcherInfos.size(); ++i) { if (mFetcherInfos.valueAt(i).mToBeRemoved) { needToRemoveFetchers = true; break; } } if (!needToRemoveFetchers && mSwapMask == mStreamMask) { mStreamMask = mNewStreamMask; mSwitchInProgress = false; mSwapMask = 0; } } void LiveSession::scheduleCheckBandwidthEvent() { sp<AMessage> msg = new AMessage(kWhatCheckBandwidth, id()); msg->setInt32("generation", mCheckBandwidthGeneration); Loading @@ -1083,16 +1282,37 @@ void LiveSession::cancelCheckBandwidthEvent() { ++mCheckBandwidthGeneration; } void LiveSession::onCheckBandwidth() { if (mReconfigurationInProgress) { scheduleCheckBandwidthEvent(); return; void LiveSession::cancelBandwidthSwitch() { Mutex::Autolock lock(mSwapMutex); mSwitchGeneration++; mSwitchInProgress = false; mSwapMask = 0; } bool LiveSession::canSwitchBandwidthTo(size_t bandwidthIndex) { if (mReconfigurationInProgress || mSwitchInProgress) { return false; } if (mPrevBandwidthIndex < 0) { return true; } if (bandwidthIndex == (size_t)mPrevBandwidthIndex) { return false; } else if (bandwidthIndex > (size_t)mPrevBandwidthIndex) { return canSwitchUp(); } else { return true; } } void LiveSession::onCheckBandwidth() { size_t bandwidthIndex = getBandwidthIndex(); if (mPrevBandwidthIndex < 0 || bandwidthIndex != (size_t)mPrevBandwidthIndex) { if (canSwitchBandwidthTo(bandwidthIndex)) { changeConfiguration(-1ll /* timeUs */, bandwidthIndex); } else { scheduleCheckBandwidthEvent(); } // Handling the kWhatCheckBandwidth even here does _not_ automatically Loading media/libstagefright/httplive/LiveSession.h +36 −0 Original line number Diff line number Diff line Loading @@ -83,6 +83,11 @@ struct LiveSession : public AHandler { kWhatPreparationFailed, }; // create a format-change discontinuity // // swap: // whether is format-change discontinuity should trigger a buffer swap sp<ABuffer> createFormatChangeBuffer(bool swap = true); protected: virtual ~LiveSession(); Loading @@ -101,6 +106,7 @@ private: kWhatChangeConfiguration2 = 'chC2', kWhatChangeConfiguration3 = 'chC3', kWhatFinishDisconnect2 = 'fin2', kWhatSwapped = 'swap', }; struct BandwidthItem { Loading @@ -112,6 +118,7 @@ private: sp<PlaylistFetcher> mFetcher; int64_t mDurationUs; bool mIsPrepared; bool mToBeRemoved; }; struct StreamItem { Loading Loading @@ -146,9 +153,26 @@ private: KeyedVector<AString, FetcherInfo> mFetcherInfos; uint32_t mStreamMask; // Masks used during reconfiguration: // mNewStreamMask: streams in the variant playlist we're switching to; // we don't want to immediately overwrite the original value. uint32_t mNewStreamMask; // mSwapMask: streams that have started to playback content in the new variant playlist; // we use this to track reconfiguration progress. uint32_t mSwapMask; KeyedVector<StreamType, sp<AnotherPacketSource> > mPacketSources; // A second set of packet sources that buffer content for the variant we're switching to. KeyedVector<StreamType, sp<AnotherPacketSource> > mPacketSources2; // A mutex used to serialize two sets of events: // * the swapping of packet sources in dequeueAccessUnit on the player thread, AND // * a forced bandwidth switch termination in cancelSwitch on the live looper. Mutex mSwapMutex; int32_t mCheckBandwidthGeneration; int32_t mSwitchGeneration; size_t mContinuationCounter; sp<AMessage> mContinuation; Loading @@ -157,6 +181,7 @@ private: int64_t mRealTimeBaseUs; bool mReconfigurationInProgress; bool mSwitchInProgress; uint32_t mDisconnectReplyID; sp<PlaylistFetcher> addFetcher(const char *uri); Loading Loading @@ -199,16 +224,27 @@ private: void onChangeConfiguration(const sp<AMessage> &msg); void onChangeConfiguration2(const sp<AMessage> &msg); void onChangeConfiguration3(const sp<AMessage> &msg); void onSwapped(const sp<AMessage> &msg); void tryToFinishBandwidthSwitch(); void scheduleCheckBandwidthEvent(); void cancelCheckBandwidthEvent(); // cancelBandwidthSwitch is atomic wrt swapPacketSource; call it to prevent packet sources // from being swapped out on stale discontinuities while manipulating // mPacketSources/mPacketSources2. void cancelBandwidthSwitch(); bool canSwitchBandwidthTo(size_t bandwidthIndex); void onCheckBandwidth(); void finishDisconnect(); void postPrepared(status_t err); void swapPacketSource(StreamType stream); bool canSwitchUp(); DISALLOW_EVIL_CONSTRUCTORS(LiveSession); }; Loading media/libstagefright/httplive/PlaylistFetcher.cpp +234 −17 File changed.Preview size limit exceeded, changes collapsed. Show changes media/libstagefright/httplive/PlaylistFetcher.h +21 −1 Original line number Diff line number Diff line Loading @@ -43,6 +43,7 @@ struct PlaylistFetcher : public AHandler { kWhatTemporarilyDoneFetching, kWhatPrepared, kWhatPreparationFailed, kWhatStartedAt, }; PlaylistFetcher( Loading @@ -56,12 +57,16 @@ struct PlaylistFetcher : public AHandler { const sp<AnotherPacketSource> &audioSource, const sp<AnotherPacketSource> &videoSource, const sp<AnotherPacketSource> &subtitleSource, int64_t startTimeUs = -1ll); int64_t startTimeUs = -1ll, int64_t minStartTimeUs = 0ll /* start after this timestamp */, int32_t startSeqNumberHint = -1 /* try starting at this sequence number */); void pauseAsync(); void stopAsync(); void resumeUntilAsync(const sp<AMessage> ¶ms); protected: virtual ~PlaylistFetcher(); virtual void onMessageReceived(const sp<AMessage> &msg); Loading @@ -76,17 +81,25 @@ private: kWhatPause = 'paus', kWhatStop = 'stop', kWhatMonitorQueue = 'moni', kWhatResumeUntil = 'rsme', kWhatDownloadNext = 'dlnx', }; static const int64_t kMinBufferedDurationUs; static const int64_t kMaxMonitorDelayUs; static const int32_t kNumSkipFrames; // notifications to mSession sp<AMessage> mNotify; sp<AMessage> mStartTimeUsNotify; sp<LiveSession> mSession; AString mURI; uint32_t mStreamTypeMask; int64_t mStartTimeUs; int64_t mMinStartTimeUs; // start fetching no earlier than this value sp<AMessage> mStopParams; // message containing the latest timestamps we should fetch. KeyedVector<LiveSession::StreamType, sp<AnotherPacketSource> > mPacketSources; Loading Loading @@ -153,6 +166,9 @@ private: void onMonitorQueue(); void onDownloadNext(); // Resume a fetcher to continue until the stopping point stored in msg. status_t onResumeUntil(const sp<AMessage> &msg); status_t extractAndQueueAccessUnits( const sp<ABuffer> &buffer, const sp<AMessage> &itemMeta); Loading @@ -165,6 +181,10 @@ private: void updateDuration(); // Before resuming a fetcher in onResume, check the remaining duration is longer than that // returned by resumeThreshold. int64_t resumeThreshold(const sp<AMessage> &msg); DISALLOW_EVIL_CONSTRUCTORS(PlaylistFetcher); }; Loading Loading
media/libstagefright/httplive/LiveSession.cpp +250 −30 Original line number Diff line number Diff line Loading @@ -40,6 +40,8 @@ #include <media/stagefright/MetaData.h> #include <media/stagefright/Utils.h> #include <utils/Mutex.h> #include <ctype.h> #include <openssl/aes.h> #include <openssl/md5.h> Loading @@ -56,10 +58,14 @@ LiveSession::LiveSession( mHTTPDataSource(new MediaHTTP(mHTTPService->makeHTTPConnection())), mPrevBandwidthIndex(-1), mStreamMask(0), mNewStreamMask(0), mSwapMask(0), mCheckBandwidthGeneration(0), mSwitchGeneration(0), mLastDequeuedTimeUs(0ll), mRealTimeBaseUs(0ll), mReconfigurationInProgress(false), mSwitchInProgress(false), mDisconnectReplyID(0) { mStreams[kAudioIndex] = StreamItem("audio"); Loading @@ -68,16 +74,37 @@ LiveSession::LiveSession( for (size_t i = 0; i < kMaxStreams; ++i) { mPacketSources.add(indexToType(i), new AnotherPacketSource(NULL /* meta */)); mPacketSources2.add(indexToType(i), new AnotherPacketSource(NULL /* meta */)); } } LiveSession::~LiveSession() { } sp<ABuffer> LiveSession::createFormatChangeBuffer(bool swap) { ABuffer *discontinuity = new ABuffer(0); discontinuity->meta()->setInt32("discontinuity", ATSParser::DISCONTINUITY_FORMATCHANGE); discontinuity->meta()->setInt32("swapPacketSource", swap); discontinuity->meta()->setInt32("switchGeneration", mSwitchGeneration); discontinuity->meta()->setInt64("timeUs", -1); return discontinuity; } void LiveSession::swapPacketSource(StreamType stream) { sp<AnotherPacketSource> &aps = mPacketSources.editValueFor(stream); sp<AnotherPacketSource> &aps2 = mPacketSources2.editValueFor(stream); sp<AnotherPacketSource> tmp = aps; aps = aps2; aps2 = tmp; aps2->clear(); } status_t LiveSession::dequeueAccessUnit( StreamType stream, sp<ABuffer> *accessUnit) { if (!(mStreamMask & stream)) { return UNKNOWN_ERROR; // return -EWOULDBLOCK to avoid halting the decoder // when switching between audio/video and audio only. return -EWOULDBLOCK; } sp<AnotherPacketSource> packetSource = mPacketSources.valueFor(stream); Loading Loading @@ -117,6 +144,25 @@ status_t LiveSession::dequeueAccessUnit( streamStr, type, extra == NULL ? "NULL" : extra->debugString().c_str()); int32_t swap; if (type == ATSParser::DISCONTINUITY_FORMATCHANGE && (*accessUnit)->meta()->findInt32("swapPacketSource", &swap) && swap) { int32_t switchGeneration; CHECK((*accessUnit)->meta()->findInt32("switchGeneration", &switchGeneration)); { Mutex::Autolock lock(mSwapMutex); if (switchGeneration == mSwitchGeneration) { swapPacketSource(stream); sp<AMessage> msg = new AMessage(kWhatSwapped, id()); msg->setInt32("stream", stream); msg->setInt32("switchGeneration", switchGeneration); msg->post(); } } } } else if (err == OK) { if (stream == STREAMTYPE_AUDIO || stream == STREAMTYPE_VIDEO) { int64_t timeUs; Loading @@ -138,6 +184,7 @@ status_t LiveSession::dequeueAccessUnit( } status_t LiveSession::getStreamFormat(StreamType stream, sp<AMessage> *format) { // No swapPacketSource race condition; called from the same thread as dequeueAccessUnit. if (!(mStreamMask & stream)) { return UNKNOWN_ERROR; } Loading Loading @@ -234,7 +281,12 @@ void LiveSession::onMessageReceived(const sp<AMessage> &msg) { if (what == PlaylistFetcher::kWhatStopped) { AString uri; CHECK(msg->findString("uri", &uri)); mFetcherInfos.removeItem(uri); if (mFetcherInfos.removeItem(uri) < 0) { // ignore duplicated kWhatStopped messages. break; } tryToFinishBandwidthSwitch(); } if (mContinuation != NULL) { Loading Loading @@ -270,6 +322,8 @@ void LiveSession::onMessageReceived(const sp<AMessage> &msg) { postPrepared(err); } cancelBandwidthSwitch(); mPacketSources.valueFor(STREAMTYPE_AUDIO)->signalEOS(err); mPacketSources.valueFor(STREAMTYPE_VIDEO)->signalEOS(err); Loading Loading @@ -308,6 +362,27 @@ void LiveSession::onMessageReceived(const sp<AMessage> &msg) { break; } case PlaylistFetcher::kWhatStartedAt: { int32_t switchGeneration; CHECK(msg->findInt32("switchGeneration", &switchGeneration)); if (switchGeneration != mSwitchGeneration) { break; } // Resume fetcher for the original variant; the resumed fetcher should // continue until the timestamps found in msg, which is stored by the // new fetcher to indicate where the new variant has started buffering. for (size_t i = 0; i < mFetcherInfos.size(); i++) { const FetcherInfo info = mFetcherInfos.valueAt(i); if (info.mToBeRemoved) { info.mFetcher->resumeUntilAsync(msg); } } break; } default: TRESPASS(); } Loading Loading @@ -352,6 +427,11 @@ void LiveSession::onMessageReceived(const sp<AMessage> &msg) { break; } case kWhatSwapped: { onSwapped(msg); break; } default: TRESPASS(); break; Loading Loading @@ -462,6 +542,10 @@ void LiveSession::finishDisconnect() { // during disconnection either. cancelCheckBandwidthEvent(); // Protect mPacketSources from a swapPacketSource race condition through disconnect. // (finishDisconnect, onFinishDisconnect2) cancelBandwidthSwitch(); for (size_t i = 0; i < mFetcherInfos.size(); ++i) { mFetcherInfos.valueAt(i).mFetcher->stopAsync(); } Loading Loading @@ -501,11 +585,13 @@ sp<PlaylistFetcher> LiveSession::addFetcher(const char *uri) { sp<AMessage> notify = new AMessage(kWhatFetcherNotify, id()); notify->setString("uri", uri); notify->setInt32("switchGeneration", mSwitchGeneration); FetcherInfo info; info.mFetcher = new PlaylistFetcher(notify, this, uri); info.mDurationUs = -1ll; info.mIsPrepared = false; info.mToBeRemoved = false; looper()->registerHandler(info.mFetcher); mFetcherInfos.add(uri, info); Loading Loading @@ -845,8 +931,25 @@ status_t LiveSession::selectTrack(size_t index, bool select) { return err; } bool LiveSession::canSwitchUp() { // Allow upwards bandwidth switch when a stream has buffered at least 10 seconds. status_t err = OK; for (size_t i = 0; i < mPacketSources.size(); ++i) { sp<AnotherPacketSource> source = mPacketSources.valueAt(i); int64_t dur = source->getBufferedDurationUs(&err); if (err == OK && dur > 10000000) { return true; } } return false; } void LiveSession::changeConfiguration( int64_t timeUs, size_t bandwidthIndex, bool pickTrack) { // Protect mPacketSources from a swapPacketSource race condition through reconfiguration. // (changeConfiguration, onChangeConfiguration2, onChangeConfiguration3). cancelBandwidthSwitch(); CHECK(!mReconfigurationInProgress); mReconfigurationInProgress = true; Loading @@ -862,7 +965,8 @@ void LiveSession::changeConfiguration( CHECK_LT(bandwidthIndex, mBandwidthItems.size()); const BandwidthItem &item = mBandwidthItems.itemAt(bandwidthIndex); uint32_t streamMask = 0; uint32_t streamMask = 0; // streams that should be fetched by the new fetcher uint32_t resumeMask = 0; // streams that should be fetched by the original fetcher AString URIs[kMaxStreams]; for (size_t i = 0; i < kMaxStreams; ++i) { Loading @@ -880,9 +984,14 @@ void LiveSession::changeConfiguration( // If we're seeking all current fetchers are discarded. if (timeUs < 0ll) { for (size_t j = 0; j < kMaxStreams; ++j) { if ((streamMask & indexToType(j)) && uri == URIs[j]) { // delay fetcher removal discardFetcher = false; for (size_t j = 0; j < kMaxStreams; ++j) { StreamType type = indexToType(j); if ((streamMask & type) && uri == URIs[j]) { resumeMask |= type; streamMask &= ~type; } } } Loading @@ -894,8 +1003,15 @@ void LiveSession::changeConfiguration( } } sp<AMessage> msg = new AMessage(kWhatChangeConfiguration2, id()); sp<AMessage> msg; if (timeUs < 0ll) { // skip onChangeConfiguration2 (decoder destruction) if switching. msg = new AMessage(kWhatChangeConfiguration3, id()); } else { msg = new AMessage(kWhatChangeConfiguration2, id()); } msg->setInt32("streamMask", streamMask); msg->setInt32("resumeMask", resumeMask); msg->setInt64("timeUs", timeUs); for (size_t i = 0; i < kMaxStreams; ++i) { if (streamMask & indexToType(i)) { Loading Loading @@ -978,11 +1094,13 @@ void LiveSession::onChangeConfiguration2(const sp<AMessage> &msg) { } void LiveSession::onChangeConfiguration3(const sp<AMessage> &msg) { mContinuation.clear(); // All remaining fetchers are still suspended, the player has shutdown // any decoders that needed it. uint32_t streamMask; uint32_t streamMask, resumeMask; CHECK(msg->findInt32("streamMask", (int32_t *)&streamMask)); CHECK(msg->findInt32("resumeMask", (int32_t *)&resumeMask)); for (size_t i = 0; i < kMaxStreams; ++i) { if (streamMask & indexToType(i)) { Loading @@ -991,37 +1109,39 @@ void LiveSession::onChangeConfiguration3(const sp<AMessage> &msg) { } int64_t timeUs; bool switching = false; CHECK(msg->findInt64("timeUs", &timeUs)); if (timeUs < 0ll) { timeUs = mLastDequeuedTimeUs; switching = true; } mRealTimeBaseUs = ALooper::GetNowUs() - timeUs; mStreamMask = streamMask; mNewStreamMask = streamMask; // Resume all existing fetchers and assign them packet sources. // Of all existing fetchers: // * Resume fetchers that are still needed and assign them original packet sources. // * Mark otherwise unneeded fetchers for removal. ALOGV("resuming fetchers for mask 0x%08x", resumeMask); for (size_t i = 0; i < mFetcherInfos.size(); ++i) { const AString &uri = mFetcherInfos.keyAt(i); uint32_t resumeMask = 0; sp<AnotherPacketSource> sources[kMaxStreams]; for (size_t j = 0; j < kMaxStreams; ++j) { if ((streamMask & indexToType(j)) && uri == mStreams[j].mUri) { if ((resumeMask & indexToType(j)) && uri == mStreams[j].mUri) { sources[j] = mPacketSources.valueFor(indexToType(j)); resumeMask |= indexToType(j); } } CHECK_NE(resumeMask, 0u); ALOGV("resuming fetchers for mask 0x%08x", resumeMask); streamMask &= ~resumeMask; mFetcherInfos.valueAt(i).mFetcher->startAsync( FetcherInfo &info = mFetcherInfos.editValueAt(i); if (sources[kAudioIndex] != NULL || sources[kVideoIndex] != NULL || sources[kSubtitleIndex] != NULL) { info.mFetcher->startAsync( sources[kAudioIndex], sources[kVideoIndex], sources[kSubtitleIndex]); } else { info.mToBeRemoved = true; } } // streamMask now only contains the types that need a new fetcher created. Loading @@ -1030,6 +1150,8 @@ void LiveSession::onChangeConfiguration3(const sp<AMessage> &msg) { ALOGV("creating new fetchers for mask 0x%08x", streamMask); } // Find out when the original fetchers have buffered up to and start the new fetchers // at a later timestamp. for (size_t i = 0; i < kMaxStreams; i++) { if (!(indexToType(i) & streamMask)) { continue; Loading @@ -1041,12 +1163,40 @@ void LiveSession::onChangeConfiguration3(const sp<AMessage> &msg) { sp<PlaylistFetcher> fetcher = addFetcher(uri.c_str()); CHECK(fetcher != NULL); int32_t latestSeq = -1; int64_t latestTimeUs = 0ll; sp<AnotherPacketSource> sources[kMaxStreams]; // TRICKY: looping from i as earlier streams are already removed from streamMask for (size_t j = i; j < kMaxStreams; ++j) { if ((streamMask & indexToType(j)) && uri == mStreams[j].mUri) { sources[j] = mPacketSources.valueFor(indexToType(j)); if (!switching) { sources[j]->clear(); } else { int32_t type, seq; int64_t srcTimeUs; sp<AMessage> meta = sources[j]->getLatestMeta(); if (meta != NULL && !meta->findInt32("discontinuity", &type)) { CHECK(meta->findInt32("seq", &seq)); if (seq > latestSeq) { latestSeq = seq; } CHECK(meta->findInt64("timeUs", &srcTimeUs)); if (srcTimeUs > latestTimeUs) { latestTimeUs = srcTimeUs; } } sources[j] = mPacketSources2.valueFor(indexToType(j)); sources[j]->clear(); uint32_t extraStreams = mNewStreamMask & (~mStreamMask); if (extraStreams & indexToType(j)) { sources[j]->queueAccessUnit(createFormatChangeBuffer(/* swap = */ false)); } } streamMask &= ~indexToType(j); } Loading @@ -1056,7 +1206,9 @@ void LiveSession::onChangeConfiguration3(const sp<AMessage> &msg) { sources[kAudioIndex], sources[kVideoIndex], sources[kSubtitleIndex], timeUs); timeUs, latestTimeUs /* min start time(us) */, latestSeq >= 0 ? latestSeq + 1 : -1 /* starting sequence number hint */ ); } // All fetchers have now been started, the configuration change Loading @@ -1065,14 +1217,61 @@ void LiveSession::onChangeConfiguration3(const sp<AMessage> &msg) { scheduleCheckBandwidthEvent(); ALOGV("XXX configuration change completed."); mReconfigurationInProgress = false; if (switching) { mSwitchInProgress = true; } else { mStreamMask = mNewStreamMask; } if (mDisconnectReplyID != 0) { finishDisconnect(); } } void LiveSession::onSwapped(const sp<AMessage> &msg) { int32_t switchGeneration; CHECK(msg->findInt32("switchGeneration", &switchGeneration)); if (switchGeneration != mSwitchGeneration) { return; } int32_t stream; CHECK(msg->findInt32("stream", &stream)); mSwapMask |= stream; if (mSwapMask != mStreamMask) { return; } // Check if new variant contains extra streams. uint32_t extraStreams = mNewStreamMask & (~mStreamMask); while (extraStreams) { StreamType extraStream = (StreamType) (extraStreams & ~(extraStreams - 1)); swapPacketSource(extraStream); extraStreams &= ~extraStream; } tryToFinishBandwidthSwitch(); } // Mark switch done when: // 1. all old buffers are swapped out, AND // 2. all old fetchers are removed. void LiveSession::tryToFinishBandwidthSwitch() { bool needToRemoveFetchers = false; for (size_t i = 0; i < mFetcherInfos.size(); ++i) { if (mFetcherInfos.valueAt(i).mToBeRemoved) { needToRemoveFetchers = true; break; } } if (!needToRemoveFetchers && mSwapMask == mStreamMask) { mStreamMask = mNewStreamMask; mSwitchInProgress = false; mSwapMask = 0; } } void LiveSession::scheduleCheckBandwidthEvent() { sp<AMessage> msg = new AMessage(kWhatCheckBandwidth, id()); msg->setInt32("generation", mCheckBandwidthGeneration); Loading @@ -1083,16 +1282,37 @@ void LiveSession::cancelCheckBandwidthEvent() { ++mCheckBandwidthGeneration; } void LiveSession::onCheckBandwidth() { if (mReconfigurationInProgress) { scheduleCheckBandwidthEvent(); return; void LiveSession::cancelBandwidthSwitch() { Mutex::Autolock lock(mSwapMutex); mSwitchGeneration++; mSwitchInProgress = false; mSwapMask = 0; } bool LiveSession::canSwitchBandwidthTo(size_t bandwidthIndex) { if (mReconfigurationInProgress || mSwitchInProgress) { return false; } if (mPrevBandwidthIndex < 0) { return true; } if (bandwidthIndex == (size_t)mPrevBandwidthIndex) { return false; } else if (bandwidthIndex > (size_t)mPrevBandwidthIndex) { return canSwitchUp(); } else { return true; } } void LiveSession::onCheckBandwidth() { size_t bandwidthIndex = getBandwidthIndex(); if (mPrevBandwidthIndex < 0 || bandwidthIndex != (size_t)mPrevBandwidthIndex) { if (canSwitchBandwidthTo(bandwidthIndex)) { changeConfiguration(-1ll /* timeUs */, bandwidthIndex); } else { scheduleCheckBandwidthEvent(); } // Handling the kWhatCheckBandwidth even here does _not_ automatically Loading
media/libstagefright/httplive/LiveSession.h +36 −0 Original line number Diff line number Diff line Loading @@ -83,6 +83,11 @@ struct LiveSession : public AHandler { kWhatPreparationFailed, }; // create a format-change discontinuity // // swap: // whether is format-change discontinuity should trigger a buffer swap sp<ABuffer> createFormatChangeBuffer(bool swap = true); protected: virtual ~LiveSession(); Loading @@ -101,6 +106,7 @@ private: kWhatChangeConfiguration2 = 'chC2', kWhatChangeConfiguration3 = 'chC3', kWhatFinishDisconnect2 = 'fin2', kWhatSwapped = 'swap', }; struct BandwidthItem { Loading @@ -112,6 +118,7 @@ private: sp<PlaylistFetcher> mFetcher; int64_t mDurationUs; bool mIsPrepared; bool mToBeRemoved; }; struct StreamItem { Loading Loading @@ -146,9 +153,26 @@ private: KeyedVector<AString, FetcherInfo> mFetcherInfos; uint32_t mStreamMask; // Masks used during reconfiguration: // mNewStreamMask: streams in the variant playlist we're switching to; // we don't want to immediately overwrite the original value. uint32_t mNewStreamMask; // mSwapMask: streams that have started to playback content in the new variant playlist; // we use this to track reconfiguration progress. uint32_t mSwapMask; KeyedVector<StreamType, sp<AnotherPacketSource> > mPacketSources; // A second set of packet sources that buffer content for the variant we're switching to. KeyedVector<StreamType, sp<AnotherPacketSource> > mPacketSources2; // A mutex used to serialize two sets of events: // * the swapping of packet sources in dequeueAccessUnit on the player thread, AND // * a forced bandwidth switch termination in cancelSwitch on the live looper. Mutex mSwapMutex; int32_t mCheckBandwidthGeneration; int32_t mSwitchGeneration; size_t mContinuationCounter; sp<AMessage> mContinuation; Loading @@ -157,6 +181,7 @@ private: int64_t mRealTimeBaseUs; bool mReconfigurationInProgress; bool mSwitchInProgress; uint32_t mDisconnectReplyID; sp<PlaylistFetcher> addFetcher(const char *uri); Loading Loading @@ -199,16 +224,27 @@ private: void onChangeConfiguration(const sp<AMessage> &msg); void onChangeConfiguration2(const sp<AMessage> &msg); void onChangeConfiguration3(const sp<AMessage> &msg); void onSwapped(const sp<AMessage> &msg); void tryToFinishBandwidthSwitch(); void scheduleCheckBandwidthEvent(); void cancelCheckBandwidthEvent(); // cancelBandwidthSwitch is atomic wrt swapPacketSource; call it to prevent packet sources // from being swapped out on stale discontinuities while manipulating // mPacketSources/mPacketSources2. void cancelBandwidthSwitch(); bool canSwitchBandwidthTo(size_t bandwidthIndex); void onCheckBandwidth(); void finishDisconnect(); void postPrepared(status_t err); void swapPacketSource(StreamType stream); bool canSwitchUp(); DISALLOW_EVIL_CONSTRUCTORS(LiveSession); }; Loading
media/libstagefright/httplive/PlaylistFetcher.cpp +234 −17 File changed.Preview size limit exceeded, changes collapsed. Show changes
media/libstagefright/httplive/PlaylistFetcher.h +21 −1 Original line number Diff line number Diff line Loading @@ -43,6 +43,7 @@ struct PlaylistFetcher : public AHandler { kWhatTemporarilyDoneFetching, kWhatPrepared, kWhatPreparationFailed, kWhatStartedAt, }; PlaylistFetcher( Loading @@ -56,12 +57,16 @@ struct PlaylistFetcher : public AHandler { const sp<AnotherPacketSource> &audioSource, const sp<AnotherPacketSource> &videoSource, const sp<AnotherPacketSource> &subtitleSource, int64_t startTimeUs = -1ll); int64_t startTimeUs = -1ll, int64_t minStartTimeUs = 0ll /* start after this timestamp */, int32_t startSeqNumberHint = -1 /* try starting at this sequence number */); void pauseAsync(); void stopAsync(); void resumeUntilAsync(const sp<AMessage> ¶ms); protected: virtual ~PlaylistFetcher(); virtual void onMessageReceived(const sp<AMessage> &msg); Loading @@ -76,17 +81,25 @@ private: kWhatPause = 'paus', kWhatStop = 'stop', kWhatMonitorQueue = 'moni', kWhatResumeUntil = 'rsme', kWhatDownloadNext = 'dlnx', }; static const int64_t kMinBufferedDurationUs; static const int64_t kMaxMonitorDelayUs; static const int32_t kNumSkipFrames; // notifications to mSession sp<AMessage> mNotify; sp<AMessage> mStartTimeUsNotify; sp<LiveSession> mSession; AString mURI; uint32_t mStreamTypeMask; int64_t mStartTimeUs; int64_t mMinStartTimeUs; // start fetching no earlier than this value sp<AMessage> mStopParams; // message containing the latest timestamps we should fetch. KeyedVector<LiveSession::StreamType, sp<AnotherPacketSource> > mPacketSources; Loading Loading @@ -153,6 +166,9 @@ private: void onMonitorQueue(); void onDownloadNext(); // Resume a fetcher to continue until the stopping point stored in msg. status_t onResumeUntil(const sp<AMessage> &msg); status_t extractAndQueueAccessUnits( const sp<ABuffer> &buffer, const sp<AMessage> &itemMeta); Loading @@ -165,6 +181,10 @@ private: void updateDuration(); // Before resuming a fetcher in onResume, check the remaining duration is longer than that // returned by resumeThreshold. int64_t resumeThreshold(const sp<AMessage> &msg); DISALLOW_EVIL_CONSTRUCTORS(PlaylistFetcher); }; Loading