Loading media/libstagefright/httplive/LiveSession.cpp +147 −196 Original line number Diff line number Diff line Loading @@ -49,8 +49,13 @@ namespace android { // static // Number of recently-read bytes to use for bandwidth estimation const size_t LiveSession::kBandwidthHistoryBytes = 200 * 1024; // High water mark to start up switch or report prepared) const int64_t LiveSession::kHighWaterMark = 8000000ll; const int64_t LiveSession::kMidWaterMark = 5000000ll; const int64_t LiveSession::kLowWaterMark = 3000000ll; LiveSession::LiveSession( const sp<AMessage> ¬ify, uint32_t flags, Loading @@ -75,14 +80,14 @@ LiveSession::LiveSession( mSeekReplyID(0), mFirstTimeUsValid(false), mFirstTimeUs(0), mLastSeekTimeUs(0) { mLastSeekTimeUs(0), mPollBufferingGeneration(0) { mStreams[kAudioIndex] = StreamItem("audio"); mStreams[kVideoIndex] = StreamItem("video"); mStreams[kSubtitleIndex] = StreamItem("subtitles"); for (size_t i = 0; i < kMaxStreams; ++i) { mDiscontinuities.add(indexToType(i), new AnotherPacketSource(NULL /* meta */)); mPacketSources.add(indexToType(i), new AnotherPacketSource(NULL /* meta */)); mPacketSources2.add(indexToType(i), new AnotherPacketSource(NULL /* meta */)); mBuffering[i] = false; Loading @@ -97,6 +102,9 @@ LiveSession::LiveSession( } LiveSession::~LiveSession() { if (mFetcherLooper != NULL) { mFetcherLooper->stop(); } } sp<ABuffer> LiveSession::createFormatChangeBuffer(bool swap) { Loading Loading @@ -125,24 +133,7 @@ status_t LiveSession::dequeueAccessUnit( return -EWOULDBLOCK; } status_t finalResult; sp<AnotherPacketSource> discontinuityQueue = mDiscontinuities.valueFor(stream); if (discontinuityQueue->hasBufferAvailable(&finalResult)) { discontinuityQueue->dequeueAccessUnit(accessUnit); // seeking, track switching sp<AMessage> extra; int64_t timeUs; if ((*accessUnit)->meta()->findMessage("extra", &extra) && extra != NULL && extra->findInt64("timeUs", &timeUs)) { // seeking only mLastSeekTimeUs = timeUs; mDiscontinuityOffsetTimesUs.clear(); mDiscontinuityAbsStartTimesUs.clear(); } return INFO_DISCONTINUITY; } status_t finalResult = OK; sp<AnotherPacketSource> packetSource = mPacketSources.valueFor(stream); ssize_t idx = typeToIndex(stream); Loading Loading @@ -172,7 +163,7 @@ status_t LiveSession::dequeueAccessUnit( if (mBuffering[idx]) { if (mSwitchInProgress || packetSource->isFinished(0) || packetSource->getEstimatedDurationUs() > targetDurationUs) { || packetSource->hasBufferAvailable(&finalResult)) { mBuffering[idx] = false; } } Loading Loading @@ -429,11 +420,16 @@ void LiveSession::onMessageReceived(const sp<AMessage> &msg) { if (what == PlaylistFetcher::kWhatStopped) { AString uri; CHECK(msg->findString("uri", &uri)); if (mFetcherInfos.removeItem(uri) < 0) { ssize_t index = mFetcherInfos.indexOfKey(uri); if (index < 0) { // ignore duplicated kWhatStopped messages. break; } mFetcherLooper->unregisterHandler( mFetcherInfos[index].mFetcher->id()); mFetcherInfos.removeItemsAt(index); if (mSwitchInProgress) { tryToFinishBandwidthSwitch(); } Loading @@ -443,14 +439,6 @@ void LiveSession::onMessageReceived(const sp<AMessage> &msg) { CHECK_GT(mContinuationCounter, 0); if (--mContinuationCounter == 0) { mContinuation->post(); if (mSeekReplyID != 0) { CHECK(mSeekReply != NULL); mSeekReply->setInt32("err", OK); mSeekReply->postReply(mSeekReplyID); mSeekReplyID = 0; mSeekReply.clear(); } } } break; Loading @@ -464,8 +452,11 @@ void LiveSession::onMessageReceived(const sp<AMessage> &msg) { int64_t durationUs; CHECK(msg->findInt64("durationUs", &durationUs)); ssize_t index = mFetcherInfos.indexOfKey(uri); if (index >= 0) { FetcherInfo *info = &mFetcherInfos.editValueFor(uri); info->mDurationUs = durationUs; } break; } Loading Loading @@ -513,34 +504,6 @@ void LiveSession::onMessageReceived(const sp<AMessage> &msg) { break; } case PlaylistFetcher::kWhatTemporarilyDoneFetching: { AString uri; CHECK(msg->findString("uri", &uri)); if (mFetcherInfos.indexOfKey(uri) < 0) { ALOGE("couldn't find uri"); break; } FetcherInfo *info = &mFetcherInfos.editValueFor(uri); info->mIsPrepared = true; if (mInPreparationPhase) { bool allFetchersPrepared = true; for (size_t i = 0; i < mFetcherInfos.size(); ++i) { if (!mFetcherInfos.valueAt(i).mIsPrepared) { allFetchersPrepared = false; break; } } if (allFetchersPrepared) { postPrepared(OK); } } break; } case PlaylistFetcher::kWhatStartedAt: { int32_t switchGeneration; Loading Loading @@ -569,19 +532,6 @@ void LiveSession::onMessageReceived(const sp<AMessage> &msg) { break; } case kWhatCheckBandwidth: { int32_t generation; CHECK(msg->findInt32("generation", &generation)); if (generation != mCheckBandwidthGeneration) { break; } onCheckBandwidth(msg); break; } case kWhatChangeConfiguration: { onChangeConfiguration(msg); Loading Loading @@ -612,15 +562,13 @@ void LiveSession::onMessageReceived(const sp<AMessage> &msg) { break; } case kWhatCheckSwitchDown: case kWhatPollBuffering: { onCheckSwitchDown(); break; int32_t generation; CHECK(msg->findInt32("generation", &generation)); if (generation == mPollBufferingGeneration) { onPollBuffering(); } case kWhatSwitchDown: { onSwitchDown(); break; } Loading Loading @@ -691,6 +639,14 @@ void LiveSession::onConnect(const sp<AMessage> &msg) { return; } // create looper for fetchers if (mFetcherLooper == NULL) { mFetcherLooper = new ALooper(); mFetcherLooper->setName("Fetcher"); mFetcherLooper->start(false, false); } // We trust the content provider to make a reasonable choice of preferred // initial bandwidth by listing it first in the variant playlist. // At startup we really don't have a good estimate on the available Loading Loading @@ -739,19 +695,20 @@ void LiveSession::onConnect(const sp<AMessage> &msg) { mPlaylist->pickRandomMediaItems(); changeConfiguration( 0ll /* timeUs */, initialBandwidthIndex, false /* pickTrack */); schedulePollBuffering(); } void LiveSession::finishDisconnect() { // No reconfiguration is currently pending, make sure none will trigger // during disconnection either. cancelCheckBandwidthEvent(); // Protect mPacketSources from a swapPacketSource race condition through disconnect. // (finishDisconnect, onFinishDisconnect2) cancelBandwidthSwitch(); // cancel switch down monitor mSwitchDownMonitor.clear(); // cancel buffer polling cancelPollBuffering(); for (size_t i = 0; i < mFetcherInfos.size(); ++i) { mFetcherInfos.valueAt(i).mFetcher->stopAsync(); Loading Loading @@ -799,7 +756,7 @@ sp<PlaylistFetcher> LiveSession::addFetcher(const char *uri) { info.mDurationUs = -1ll; info.mIsPrepared = false; info.mToBeRemoved = false; looper()->registerHandler(info.mFetcher); mFetcherLooper->registerHandler(info.mFetcher); mFetcherInfos.add(uri, info); Loading Loading @@ -1201,19 +1158,6 @@ ssize_t LiveSession::getSelectedTrack(media_track_type type) const { } } bool LiveSession::canSwitchUp() { // Allow upwards bandwidth switch when a stream has buffered at least 10 seconds. status_t err = OK; for (size_t i = 0; i < mPacketSources.size(); ++i) { sp<AnotherPacketSource> source = mPacketSources.valueAt(i); int64_t dur = source->getBufferedDurationUs(&err); if (err == OK && dur > 10000000) { return true; } } return false; } void LiveSession::changeConfiguration( int64_t timeUs, size_t bandwidthIndex, bool pickTrack) { // Protect mPacketSources from a swapPacketSource race condition through reconfiguration. Loading Loading @@ -1296,14 +1240,6 @@ void LiveSession::changeConfiguration( if (mContinuationCounter == 0) { msg->post(); if (mSeekReplyID != 0) { CHECK(mSeekReply != NULL); mSeekReply->setInt32("err", OK); mSeekReply->postReply(mSeekReplyID); mSeekReplyID = 0; mSeekReply.clear(); } } } Loading @@ -1323,6 +1259,30 @@ void LiveSession::onChangeConfiguration2(const sp<AMessage> &msg) { // All fetchers are either suspended or have been removed now. // If we're seeking, clear all packet sources before we report // seek complete, to prevent decoder from pulling stale data. int64_t timeUs; CHECK(msg->findInt64("timeUs", &timeUs)); if (timeUs >= 0) { mLastSeekTimeUs = timeUs; for (size_t i = 0; i < mPacketSources.size(); i++) { mPacketSources.editValueAt(i)->clear(); } mDiscontinuityOffsetTimesUs.clear(); mDiscontinuityAbsStartTimesUs.clear(); if (mSeekReplyID != 0) { CHECK(mSeekReply != NULL); mSeekReply->setInt32("err", OK); mSeekReply->postReply(mSeekReplyID); mSeekReplyID = 0; mSeekReply.clear(); } } uint32_t streamMask, resumeMask; CHECK(msg->findInt32("streamMask", (int32_t *)&streamMask)); CHECK(msg->findInt32("resumeMask", (int32_t *)&resumeMask)); Loading Loading @@ -1428,19 +1388,8 @@ void LiveSession::onChangeConfiguration3(const sp<AMessage> &msg) { for (size_t j = 0; j < kMaxStreams; ++j) { if ((resumeMask & indexToType(j)) && uri == mStreams[j].mUri) { sources[j] = mPacketSources.valueFor(indexToType(j)); if (j != kSubtitleIndex) { ALOGV("queueing dummy discontinuity for stream type %d", indexToType(j)); sp<AnotherPacketSource> discontinuityQueue; discontinuityQueue = mDiscontinuities.valueFor(indexToType(j)); discontinuityQueue->queueDiscontinuity( ATSParser::DISCONTINUITY_NONE, NULL, true); } } } FetcherInfo &info = mFetcherInfos.editValueAt(i); if (sources[kAudioIndex] != NULL || sources[kVideoIndex] != NULL || sources[kSubtitleIndex] != NULL) { Loading Loading @@ -1486,15 +1435,7 @@ void LiveSession::onChangeConfiguration3(const sp<AMessage> &msg) { sources[j] = mPacketSources.valueFor(indexToType(j)); if (timeUs >= 0) { sources[j]->clear(); startTimeUs = timeUs; sp<AnotherPacketSource> discontinuityQueue; sp<AMessage> extra = new AMessage; extra->setInt64("timeUs", timeUs); discontinuityQueue = mDiscontinuities.valueFor(indexToType(j)); discontinuityQueue->queueDiscontinuity( ATSParser::DISCONTINUITY_TIME, extra, true); } else { int32_t type; sp<AMessage> meta; Loading Loading @@ -1532,9 +1473,10 @@ void LiveSession::onChangeConfiguration3(const sp<AMessage> &msg) { if (j == kSubtitleIndex) { break; } sp<AnotherPacketSource> discontinuityQueue; discontinuityQueue = mDiscontinuities.valueFor(indexToType(j)); discontinuityQueue->queueDiscontinuity( ALOGV("stream[%d]: queue format change", j); sources[j]->queueDiscontinuity( ATSParser::DISCONTINUITY_FORMATCHANGE, NULL, true); } else { // adapting, queue discontinuities after resume Loading Loading @@ -1564,9 +1506,6 @@ void LiveSession::onChangeConfiguration3(const sp<AMessage> &msg) { // All fetchers have now been started, the configuration change // has completed. cancelCheckBandwidthEvent(); scheduleCheckBandwidthEvent(); ALOGV("XXX configuration change completed."); mReconfigurationInProgress = false; if (switching) { Loading Loading @@ -1623,47 +1562,35 @@ void LiveSession::onSwapped(const sp<AMessage> &msg) { tryToFinishBandwidthSwitch(); } void LiveSession::onCheckSwitchDown() { if (mSwitchDownMonitor == NULL) { return; void LiveSession::schedulePollBuffering() { sp<AMessage> msg = new AMessage(kWhatPollBuffering, this); msg->setInt32("generation", mPollBufferingGeneration); msg->post(1000000ll); } if (mSwitchInProgress || mReconfigurationInProgress) { ALOGV("Switch/Reconfig in progress, defer switch down"); mSwitchDownMonitor->post(1000000ll); return; void LiveSession::cancelPollBuffering() { ++mPollBufferingGeneration; } for (size_t i = 0; i < kMaxStreams; ++i) { int32_t targetDuration; sp<AnotherPacketSource> packetSource = mPacketSources.valueFor(indexToType(i)); sp<AMessage> meta = packetSource->getLatestDequeuedMeta(); void LiveSession::onPollBuffering() { ALOGV("onPollBuffering: mSwitchInProgress %d, mReconfigurationInProgress %d, " "mInPreparationPhase %d, mStreamMask 0x%x", mSwitchInProgress, mReconfigurationInProgress, mInPreparationPhase, mStreamMask); if (meta != NULL && meta->findInt32("targetDuration", &targetDuration) ) { int64_t bufferedDurationUs = packetSource->getEstimatedDurationUs(); int64_t targetDurationUs = targetDuration * 1000000ll; if (bufferedDurationUs < targetDurationUs / 3) { (new AMessage(kWhatSwitchDown, this))->post(); break; } } } mSwitchDownMonitor->post(1000000ll); bool low, mid, high; if (checkBuffering(low, mid, high)) { if (mInPreparationPhase && mid) { postPrepared(OK); } void LiveSession::onSwitchDown() { if (mReconfigurationInProgress || mSwitchInProgress || mCurBandwidthIndex == 0) { return; // don't switch before we report prepared if (!mInPreparationPhase && (low || high)) { switchBandwidthIfNeeded(high); } ssize_t bandwidthIndex = getBandwidthIndex(); if (bandwidthIndex < mCurBandwidthIndex) { changeConfiguration(-1, bandwidthIndex, false); return; } schedulePollBuffering(); } // Mark switch done when: Loading @@ -1688,16 +1615,6 @@ void LiveSession::tryToFinishBandwidthSwitch() { } } void LiveSession::scheduleCheckBandwidthEvent() { sp<AMessage> msg = new AMessage(kWhatCheckBandwidth, this); msg->setInt32("generation", mCheckBandwidthGeneration); msg->post(10000000ll); } void LiveSession::cancelCheckBandwidthEvent() { ++mCheckBandwidthGeneration; } void LiveSession::cancelBandwidthSwitch() { Mutex::Autolock lock(mSwapMutex); mSwitchGeneration++; Loading Loading @@ -1727,33 +1644,69 @@ void LiveSession::cancelBandwidthSwitch() { } } bool LiveSession::canSwitchBandwidthTo(size_t bandwidthIndex) { if (mReconfigurationInProgress || mSwitchInProgress) { bool LiveSession::checkBuffering(bool &low, bool &mid, bool &high) { low = mid = high = false; if (mSwitchInProgress || mReconfigurationInProgress) { ALOGV("Switch/Reconfig in progress, defer buffer polling"); return false; } if (mCurBandwidthIndex < 0) { return true; // TODO: Fine tune low/high mark. // We also need to pause playback if buffering is too low. // Currently during underflow, we depend on decoder to starve // to pause, but A/V could have different buffering left, // they're not paused together. // TODO: Report buffering level to NuPlayer for BUFFERING_UPDATE // Switch down if any of the fetchers are below low mark; // Switch up if all of the fetchers are over high mark. size_t activeCount, lowCount, midCount, highCount; activeCount = lowCount = midCount = highCount = 0; for (size_t i = 0; i < mPacketSources.size(); ++i) { // we don't check subtitles for buffering level if (!(mStreamMask & mPacketSources.keyAt(i) & (STREAMTYPE_AUDIO | STREAMTYPE_VIDEO))) { continue; } // ignore streams that never had any packet queued. // (it's possible that the variant only has audio or video) sp<AMessage> meta = mPacketSources[i]->getLatestEnqueuedMeta(); if (meta == NULL) { continue; } if (bandwidthIndex == (size_t)mCurBandwidthIndex) { return false; } else if (bandwidthIndex > (size_t)mCurBandwidthIndex) { return canSwitchUp(); } else { ++activeCount; int64_t bufferedDurationUs = mPacketSources[i]->getEstimatedDurationUs(); ALOGV("source[%d]: buffered %lld us", i, bufferedDurationUs); if (bufferedDurationUs < kLowWaterMark) { ++lowCount; break; } else if (bufferedDurationUs > kHighWaterMark) { ++midCount; ++highCount; } else if (bufferedDurationUs > kMidWaterMark) { ++midCount; } } if (activeCount > 0) { high = (highCount == activeCount); mid = (midCount == activeCount); low = (lowCount > 0); return true; } return false; } void LiveSession::onCheckBandwidth(const sp<AMessage> &msg) { size_t bandwidthIndex = getBandwidthIndex(); if (canSwitchBandwidthTo(bandwidthIndex)) { changeConfiguration(-1ll /* timeUs */, bandwidthIndex); } else { // Come back and check again 10 seconds later in case there is nothing to do now. // If we DO change configuration, once that completes it'll schedule a new // check bandwidth event with an incremented mCheckBandwidthGeneration. msg->post(10000000ll); void LiveSession::switchBandwidthIfNeeded(bool canSwitchUp) { ssize_t bandwidthIndex = getBandwidthIndex(); if ((canSwitchUp && bandwidthIndex > mCurBandwidthIndex) || (!canSwitchUp && bandwidthIndex < mCurBandwidthIndex)) { changeConfiguration(-1, bandwidthIndex, false); } } Loading @@ -1771,10 +1724,8 @@ void LiveSession::postPrepared(status_t err) { notify->post(); mInPreparationPhase = false; mSwitchDownMonitor = new AMessage(kWhatCheckSwitchDown, this); mSwitchDownMonitor->post(); } } // namespace android media/libstagefright/httplive/LiveSession.h +18 −16 Original line number Diff line number Diff line Loading @@ -40,10 +40,6 @@ struct LiveSession : public AHandler { // Don't log any URLs. kFlagIncognito = 1, }; LiveSession( const sp<AMessage> ¬ify, uint32_t flags, const sp<IMediaHTTPService> &httpService); enum StreamIndex { kAudioIndex = 0, Loading @@ -57,6 +53,12 @@ struct LiveSession : public AHandler { STREAMTYPE_VIDEO = 1 << kVideoIndex, STREAMTYPE_SUBTITLES = 1 << kSubtitleIndex, }; LiveSession( const sp<AMessage> ¬ify, uint32_t flags, const sp<IMediaHTTPService> &httpService); status_t dequeueAccessUnit(StreamType stream, sp<ABuffer> *accessUnit); status_t getStreamFormat(StreamType stream, sp<AMessage> *format); Loading Loading @@ -110,11 +112,13 @@ private: kWhatChangeConfiguration3 = 'chC3', kWhatFinishDisconnect2 = 'fin2', kWhatSwapped = 'swap', kWhatCheckSwitchDown = 'ckSD', kWhatSwitchDown = 'sDwn', kWhatPollBuffering = 'poll', }; static const size_t kBandwidthHistoryBytes; static const int64_t kHighWaterMark; static const int64_t kMidWaterMark; static const int64_t kLowWaterMark; struct BandwidthItem { size_t mPlaylistIndex; Loading Loading @@ -169,6 +173,7 @@ private: sp<M3UParser> mPlaylist; sp<ALooper> mFetcherLooper; KeyedVector<AString, FetcherInfo> mFetcherInfos; uint32_t mStreamMask; Loading @@ -181,7 +186,6 @@ private: // we use this to track reconfiguration progress. uint32_t mSwapMask; KeyedVector<StreamType, sp<AnotherPacketSource> > mDiscontinuities; KeyedVector<StreamType, sp<AnotherPacketSource> > mPacketSources; // A second set of packet sources that buffer content for the variant we're switching to. KeyedVector<StreamType, sp<AnotherPacketSource> > mPacketSources2; Loading Loading @@ -210,10 +214,11 @@ private: bool mFirstTimeUsValid; int64_t mFirstTimeUs; int64_t mLastSeekTimeUs; sp<AMessage> mSwitchDownMonitor; KeyedVector<size_t, int64_t> mDiscontinuityAbsStartTimesUs; KeyedVector<size_t, int64_t> mDiscontinuityOffsetTimesUs; int32_t mPollBufferingGeneration; sp<PlaylistFetcher> addFetcher(const char *uri); void onConnect(const sp<AMessage> &msg); Loading Loading @@ -257,27 +262,24 @@ private: void onChangeConfiguration2(const sp<AMessage> &msg); void onChangeConfiguration3(const sp<AMessage> &msg); void onSwapped(const sp<AMessage> &msg); void onCheckSwitchDown(); void onSwitchDown(); void tryToFinishBandwidthSwitch(); void scheduleCheckBandwidthEvent(); void cancelCheckBandwidthEvent(); // cancelBandwidthSwitch is atomic wrt swapPacketSource; call it to prevent packet sources // from being swapped out on stale discontinuities while manipulating // mPacketSources/mPacketSources2. void cancelBandwidthSwitch(); bool canSwitchBandwidthTo(size_t bandwidthIndex); void onCheckBandwidth(const sp<AMessage> &msg); void schedulePollBuffering(); void cancelPollBuffering(); void onPollBuffering(); bool checkBuffering(bool &low, bool &mid, bool &high); void switchBandwidthIfNeeded(bool canSwitchUp); void finishDisconnect(); void postPrepared(status_t err); void swapPacketSource(StreamType stream); bool canSwitchUp(); DISALLOW_EVIL_CONSTRUCTORS(LiveSession); }; Loading media/libstagefright/httplive/PlaylistFetcher.cpp +15 −47 File changed.Preview size limit exceeded, changes collapsed. Show changes media/libstagefright/httplive/PlaylistFetcher.h +1 −5 Original line number Diff line number Diff line Loading @@ -36,6 +36,7 @@ class String8; struct PlaylistFetcher : public AHandler { static const int64_t kMinBufferedDurationUs; static const int32_t kDownloadBlockSize; static const int64_t kFetcherResumeThreshold; enum { kWhatStarted, Loading @@ -43,7 +44,6 @@ struct PlaylistFetcher : public AHandler { kWhatStopped, kWhatError, kWhatDurationUpdate, kWhatTemporarilyDoneFetching, kWhatPrepared, kWhatPreparationFailed, kWhatStartedAt, Loading Loading @@ -212,10 +212,6 @@ private: void updateDuration(); // Before resuming a fetcher in onResume, check the remaining duration is longer than that // returned by resumeThreshold. int64_t resumeThreshold(const sp<AMessage> &msg); DISALLOW_EVIL_CONSTRUCTORS(PlaylistFetcher); }; Loading Loading
media/libstagefright/httplive/LiveSession.cpp +147 −196 Original line number Diff line number Diff line Loading @@ -49,8 +49,13 @@ namespace android { // static // Number of recently-read bytes to use for bandwidth estimation const size_t LiveSession::kBandwidthHistoryBytes = 200 * 1024; // High water mark to start up switch or report prepared) const int64_t LiveSession::kHighWaterMark = 8000000ll; const int64_t LiveSession::kMidWaterMark = 5000000ll; const int64_t LiveSession::kLowWaterMark = 3000000ll; LiveSession::LiveSession( const sp<AMessage> ¬ify, uint32_t flags, Loading @@ -75,14 +80,14 @@ LiveSession::LiveSession( mSeekReplyID(0), mFirstTimeUsValid(false), mFirstTimeUs(0), mLastSeekTimeUs(0) { mLastSeekTimeUs(0), mPollBufferingGeneration(0) { mStreams[kAudioIndex] = StreamItem("audio"); mStreams[kVideoIndex] = StreamItem("video"); mStreams[kSubtitleIndex] = StreamItem("subtitles"); for (size_t i = 0; i < kMaxStreams; ++i) { mDiscontinuities.add(indexToType(i), new AnotherPacketSource(NULL /* meta */)); mPacketSources.add(indexToType(i), new AnotherPacketSource(NULL /* meta */)); mPacketSources2.add(indexToType(i), new AnotherPacketSource(NULL /* meta */)); mBuffering[i] = false; Loading @@ -97,6 +102,9 @@ LiveSession::LiveSession( } LiveSession::~LiveSession() { if (mFetcherLooper != NULL) { mFetcherLooper->stop(); } } sp<ABuffer> LiveSession::createFormatChangeBuffer(bool swap) { Loading Loading @@ -125,24 +133,7 @@ status_t LiveSession::dequeueAccessUnit( return -EWOULDBLOCK; } status_t finalResult; sp<AnotherPacketSource> discontinuityQueue = mDiscontinuities.valueFor(stream); if (discontinuityQueue->hasBufferAvailable(&finalResult)) { discontinuityQueue->dequeueAccessUnit(accessUnit); // seeking, track switching sp<AMessage> extra; int64_t timeUs; if ((*accessUnit)->meta()->findMessage("extra", &extra) && extra != NULL && extra->findInt64("timeUs", &timeUs)) { // seeking only mLastSeekTimeUs = timeUs; mDiscontinuityOffsetTimesUs.clear(); mDiscontinuityAbsStartTimesUs.clear(); } return INFO_DISCONTINUITY; } status_t finalResult = OK; sp<AnotherPacketSource> packetSource = mPacketSources.valueFor(stream); ssize_t idx = typeToIndex(stream); Loading Loading @@ -172,7 +163,7 @@ status_t LiveSession::dequeueAccessUnit( if (mBuffering[idx]) { if (mSwitchInProgress || packetSource->isFinished(0) || packetSource->getEstimatedDurationUs() > targetDurationUs) { || packetSource->hasBufferAvailable(&finalResult)) { mBuffering[idx] = false; } } Loading Loading @@ -429,11 +420,16 @@ void LiveSession::onMessageReceived(const sp<AMessage> &msg) { if (what == PlaylistFetcher::kWhatStopped) { AString uri; CHECK(msg->findString("uri", &uri)); if (mFetcherInfos.removeItem(uri) < 0) { ssize_t index = mFetcherInfos.indexOfKey(uri); if (index < 0) { // ignore duplicated kWhatStopped messages. break; } mFetcherLooper->unregisterHandler( mFetcherInfos[index].mFetcher->id()); mFetcherInfos.removeItemsAt(index); if (mSwitchInProgress) { tryToFinishBandwidthSwitch(); } Loading @@ -443,14 +439,6 @@ void LiveSession::onMessageReceived(const sp<AMessage> &msg) { CHECK_GT(mContinuationCounter, 0); if (--mContinuationCounter == 0) { mContinuation->post(); if (mSeekReplyID != 0) { CHECK(mSeekReply != NULL); mSeekReply->setInt32("err", OK); mSeekReply->postReply(mSeekReplyID); mSeekReplyID = 0; mSeekReply.clear(); } } } break; Loading @@ -464,8 +452,11 @@ void LiveSession::onMessageReceived(const sp<AMessage> &msg) { int64_t durationUs; CHECK(msg->findInt64("durationUs", &durationUs)); ssize_t index = mFetcherInfos.indexOfKey(uri); if (index >= 0) { FetcherInfo *info = &mFetcherInfos.editValueFor(uri); info->mDurationUs = durationUs; } break; } Loading Loading @@ -513,34 +504,6 @@ void LiveSession::onMessageReceived(const sp<AMessage> &msg) { break; } case PlaylistFetcher::kWhatTemporarilyDoneFetching: { AString uri; CHECK(msg->findString("uri", &uri)); if (mFetcherInfos.indexOfKey(uri) < 0) { ALOGE("couldn't find uri"); break; } FetcherInfo *info = &mFetcherInfos.editValueFor(uri); info->mIsPrepared = true; if (mInPreparationPhase) { bool allFetchersPrepared = true; for (size_t i = 0; i < mFetcherInfos.size(); ++i) { if (!mFetcherInfos.valueAt(i).mIsPrepared) { allFetchersPrepared = false; break; } } if (allFetchersPrepared) { postPrepared(OK); } } break; } case PlaylistFetcher::kWhatStartedAt: { int32_t switchGeneration; Loading Loading @@ -569,19 +532,6 @@ void LiveSession::onMessageReceived(const sp<AMessage> &msg) { break; } case kWhatCheckBandwidth: { int32_t generation; CHECK(msg->findInt32("generation", &generation)); if (generation != mCheckBandwidthGeneration) { break; } onCheckBandwidth(msg); break; } case kWhatChangeConfiguration: { onChangeConfiguration(msg); Loading Loading @@ -612,15 +562,13 @@ void LiveSession::onMessageReceived(const sp<AMessage> &msg) { break; } case kWhatCheckSwitchDown: case kWhatPollBuffering: { onCheckSwitchDown(); break; int32_t generation; CHECK(msg->findInt32("generation", &generation)); if (generation == mPollBufferingGeneration) { onPollBuffering(); } case kWhatSwitchDown: { onSwitchDown(); break; } Loading Loading @@ -691,6 +639,14 @@ void LiveSession::onConnect(const sp<AMessage> &msg) { return; } // create looper for fetchers if (mFetcherLooper == NULL) { mFetcherLooper = new ALooper(); mFetcherLooper->setName("Fetcher"); mFetcherLooper->start(false, false); } // We trust the content provider to make a reasonable choice of preferred // initial bandwidth by listing it first in the variant playlist. // At startup we really don't have a good estimate on the available Loading Loading @@ -739,19 +695,20 @@ void LiveSession::onConnect(const sp<AMessage> &msg) { mPlaylist->pickRandomMediaItems(); changeConfiguration( 0ll /* timeUs */, initialBandwidthIndex, false /* pickTrack */); schedulePollBuffering(); } void LiveSession::finishDisconnect() { // No reconfiguration is currently pending, make sure none will trigger // during disconnection either. cancelCheckBandwidthEvent(); // Protect mPacketSources from a swapPacketSource race condition through disconnect. // (finishDisconnect, onFinishDisconnect2) cancelBandwidthSwitch(); // cancel switch down monitor mSwitchDownMonitor.clear(); // cancel buffer polling cancelPollBuffering(); for (size_t i = 0; i < mFetcherInfos.size(); ++i) { mFetcherInfos.valueAt(i).mFetcher->stopAsync(); Loading Loading @@ -799,7 +756,7 @@ sp<PlaylistFetcher> LiveSession::addFetcher(const char *uri) { info.mDurationUs = -1ll; info.mIsPrepared = false; info.mToBeRemoved = false; looper()->registerHandler(info.mFetcher); mFetcherLooper->registerHandler(info.mFetcher); mFetcherInfos.add(uri, info); Loading Loading @@ -1201,19 +1158,6 @@ ssize_t LiveSession::getSelectedTrack(media_track_type type) const { } } bool LiveSession::canSwitchUp() { // Allow upwards bandwidth switch when a stream has buffered at least 10 seconds. status_t err = OK; for (size_t i = 0; i < mPacketSources.size(); ++i) { sp<AnotherPacketSource> source = mPacketSources.valueAt(i); int64_t dur = source->getBufferedDurationUs(&err); if (err == OK && dur > 10000000) { return true; } } return false; } void LiveSession::changeConfiguration( int64_t timeUs, size_t bandwidthIndex, bool pickTrack) { // Protect mPacketSources from a swapPacketSource race condition through reconfiguration. Loading Loading @@ -1296,14 +1240,6 @@ void LiveSession::changeConfiguration( if (mContinuationCounter == 0) { msg->post(); if (mSeekReplyID != 0) { CHECK(mSeekReply != NULL); mSeekReply->setInt32("err", OK); mSeekReply->postReply(mSeekReplyID); mSeekReplyID = 0; mSeekReply.clear(); } } } Loading @@ -1323,6 +1259,30 @@ void LiveSession::onChangeConfiguration2(const sp<AMessage> &msg) { // All fetchers are either suspended or have been removed now. // If we're seeking, clear all packet sources before we report // seek complete, to prevent decoder from pulling stale data. int64_t timeUs; CHECK(msg->findInt64("timeUs", &timeUs)); if (timeUs >= 0) { mLastSeekTimeUs = timeUs; for (size_t i = 0; i < mPacketSources.size(); i++) { mPacketSources.editValueAt(i)->clear(); } mDiscontinuityOffsetTimesUs.clear(); mDiscontinuityAbsStartTimesUs.clear(); if (mSeekReplyID != 0) { CHECK(mSeekReply != NULL); mSeekReply->setInt32("err", OK); mSeekReply->postReply(mSeekReplyID); mSeekReplyID = 0; mSeekReply.clear(); } } uint32_t streamMask, resumeMask; CHECK(msg->findInt32("streamMask", (int32_t *)&streamMask)); CHECK(msg->findInt32("resumeMask", (int32_t *)&resumeMask)); Loading Loading @@ -1428,19 +1388,8 @@ void LiveSession::onChangeConfiguration3(const sp<AMessage> &msg) { for (size_t j = 0; j < kMaxStreams; ++j) { if ((resumeMask & indexToType(j)) && uri == mStreams[j].mUri) { sources[j] = mPacketSources.valueFor(indexToType(j)); if (j != kSubtitleIndex) { ALOGV("queueing dummy discontinuity for stream type %d", indexToType(j)); sp<AnotherPacketSource> discontinuityQueue; discontinuityQueue = mDiscontinuities.valueFor(indexToType(j)); discontinuityQueue->queueDiscontinuity( ATSParser::DISCONTINUITY_NONE, NULL, true); } } } FetcherInfo &info = mFetcherInfos.editValueAt(i); if (sources[kAudioIndex] != NULL || sources[kVideoIndex] != NULL || sources[kSubtitleIndex] != NULL) { Loading Loading @@ -1486,15 +1435,7 @@ void LiveSession::onChangeConfiguration3(const sp<AMessage> &msg) { sources[j] = mPacketSources.valueFor(indexToType(j)); if (timeUs >= 0) { sources[j]->clear(); startTimeUs = timeUs; sp<AnotherPacketSource> discontinuityQueue; sp<AMessage> extra = new AMessage; extra->setInt64("timeUs", timeUs); discontinuityQueue = mDiscontinuities.valueFor(indexToType(j)); discontinuityQueue->queueDiscontinuity( ATSParser::DISCONTINUITY_TIME, extra, true); } else { int32_t type; sp<AMessage> meta; Loading Loading @@ -1532,9 +1473,10 @@ void LiveSession::onChangeConfiguration3(const sp<AMessage> &msg) { if (j == kSubtitleIndex) { break; } sp<AnotherPacketSource> discontinuityQueue; discontinuityQueue = mDiscontinuities.valueFor(indexToType(j)); discontinuityQueue->queueDiscontinuity( ALOGV("stream[%d]: queue format change", j); sources[j]->queueDiscontinuity( ATSParser::DISCONTINUITY_FORMATCHANGE, NULL, true); } else { // adapting, queue discontinuities after resume Loading Loading @@ -1564,9 +1506,6 @@ void LiveSession::onChangeConfiguration3(const sp<AMessage> &msg) { // All fetchers have now been started, the configuration change // has completed. cancelCheckBandwidthEvent(); scheduleCheckBandwidthEvent(); ALOGV("XXX configuration change completed."); mReconfigurationInProgress = false; if (switching) { Loading Loading @@ -1623,47 +1562,35 @@ void LiveSession::onSwapped(const sp<AMessage> &msg) { tryToFinishBandwidthSwitch(); } void LiveSession::onCheckSwitchDown() { if (mSwitchDownMonitor == NULL) { return; void LiveSession::schedulePollBuffering() { sp<AMessage> msg = new AMessage(kWhatPollBuffering, this); msg->setInt32("generation", mPollBufferingGeneration); msg->post(1000000ll); } if (mSwitchInProgress || mReconfigurationInProgress) { ALOGV("Switch/Reconfig in progress, defer switch down"); mSwitchDownMonitor->post(1000000ll); return; void LiveSession::cancelPollBuffering() { ++mPollBufferingGeneration; } for (size_t i = 0; i < kMaxStreams; ++i) { int32_t targetDuration; sp<AnotherPacketSource> packetSource = mPacketSources.valueFor(indexToType(i)); sp<AMessage> meta = packetSource->getLatestDequeuedMeta(); void LiveSession::onPollBuffering() { ALOGV("onPollBuffering: mSwitchInProgress %d, mReconfigurationInProgress %d, " "mInPreparationPhase %d, mStreamMask 0x%x", mSwitchInProgress, mReconfigurationInProgress, mInPreparationPhase, mStreamMask); if (meta != NULL && meta->findInt32("targetDuration", &targetDuration) ) { int64_t bufferedDurationUs = packetSource->getEstimatedDurationUs(); int64_t targetDurationUs = targetDuration * 1000000ll; if (bufferedDurationUs < targetDurationUs / 3) { (new AMessage(kWhatSwitchDown, this))->post(); break; } } } mSwitchDownMonitor->post(1000000ll); bool low, mid, high; if (checkBuffering(low, mid, high)) { if (mInPreparationPhase && mid) { postPrepared(OK); } void LiveSession::onSwitchDown() { if (mReconfigurationInProgress || mSwitchInProgress || mCurBandwidthIndex == 0) { return; // don't switch before we report prepared if (!mInPreparationPhase && (low || high)) { switchBandwidthIfNeeded(high); } ssize_t bandwidthIndex = getBandwidthIndex(); if (bandwidthIndex < mCurBandwidthIndex) { changeConfiguration(-1, bandwidthIndex, false); return; } schedulePollBuffering(); } // Mark switch done when: Loading @@ -1688,16 +1615,6 @@ void LiveSession::tryToFinishBandwidthSwitch() { } } void LiveSession::scheduleCheckBandwidthEvent() { sp<AMessage> msg = new AMessage(kWhatCheckBandwidth, this); msg->setInt32("generation", mCheckBandwidthGeneration); msg->post(10000000ll); } void LiveSession::cancelCheckBandwidthEvent() { ++mCheckBandwidthGeneration; } void LiveSession::cancelBandwidthSwitch() { Mutex::Autolock lock(mSwapMutex); mSwitchGeneration++; Loading Loading @@ -1727,33 +1644,69 @@ void LiveSession::cancelBandwidthSwitch() { } } bool LiveSession::canSwitchBandwidthTo(size_t bandwidthIndex) { if (mReconfigurationInProgress || mSwitchInProgress) { bool LiveSession::checkBuffering(bool &low, bool &mid, bool &high) { low = mid = high = false; if (mSwitchInProgress || mReconfigurationInProgress) { ALOGV("Switch/Reconfig in progress, defer buffer polling"); return false; } if (mCurBandwidthIndex < 0) { return true; // TODO: Fine tune low/high mark. // We also need to pause playback if buffering is too low. // Currently during underflow, we depend on decoder to starve // to pause, but A/V could have different buffering left, // they're not paused together. // TODO: Report buffering level to NuPlayer for BUFFERING_UPDATE // Switch down if any of the fetchers are below low mark; // Switch up if all of the fetchers are over high mark. size_t activeCount, lowCount, midCount, highCount; activeCount = lowCount = midCount = highCount = 0; for (size_t i = 0; i < mPacketSources.size(); ++i) { // we don't check subtitles for buffering level if (!(mStreamMask & mPacketSources.keyAt(i) & (STREAMTYPE_AUDIO | STREAMTYPE_VIDEO))) { continue; } // ignore streams that never had any packet queued. // (it's possible that the variant only has audio or video) sp<AMessage> meta = mPacketSources[i]->getLatestEnqueuedMeta(); if (meta == NULL) { continue; } if (bandwidthIndex == (size_t)mCurBandwidthIndex) { return false; } else if (bandwidthIndex > (size_t)mCurBandwidthIndex) { return canSwitchUp(); } else { ++activeCount; int64_t bufferedDurationUs = mPacketSources[i]->getEstimatedDurationUs(); ALOGV("source[%d]: buffered %lld us", i, bufferedDurationUs); if (bufferedDurationUs < kLowWaterMark) { ++lowCount; break; } else if (bufferedDurationUs > kHighWaterMark) { ++midCount; ++highCount; } else if (bufferedDurationUs > kMidWaterMark) { ++midCount; } } if (activeCount > 0) { high = (highCount == activeCount); mid = (midCount == activeCount); low = (lowCount > 0); return true; } return false; } void LiveSession::onCheckBandwidth(const sp<AMessage> &msg) { size_t bandwidthIndex = getBandwidthIndex(); if (canSwitchBandwidthTo(bandwidthIndex)) { changeConfiguration(-1ll /* timeUs */, bandwidthIndex); } else { // Come back and check again 10 seconds later in case there is nothing to do now. // If we DO change configuration, once that completes it'll schedule a new // check bandwidth event with an incremented mCheckBandwidthGeneration. msg->post(10000000ll); void LiveSession::switchBandwidthIfNeeded(bool canSwitchUp) { ssize_t bandwidthIndex = getBandwidthIndex(); if ((canSwitchUp && bandwidthIndex > mCurBandwidthIndex) || (!canSwitchUp && bandwidthIndex < mCurBandwidthIndex)) { changeConfiguration(-1, bandwidthIndex, false); } } Loading @@ -1771,10 +1724,8 @@ void LiveSession::postPrepared(status_t err) { notify->post(); mInPreparationPhase = false; mSwitchDownMonitor = new AMessage(kWhatCheckSwitchDown, this); mSwitchDownMonitor->post(); } } // namespace android
media/libstagefright/httplive/LiveSession.h +18 −16 Original line number Diff line number Diff line Loading @@ -40,10 +40,6 @@ struct LiveSession : public AHandler { // Don't log any URLs. kFlagIncognito = 1, }; LiveSession( const sp<AMessage> ¬ify, uint32_t flags, const sp<IMediaHTTPService> &httpService); enum StreamIndex { kAudioIndex = 0, Loading @@ -57,6 +53,12 @@ struct LiveSession : public AHandler { STREAMTYPE_VIDEO = 1 << kVideoIndex, STREAMTYPE_SUBTITLES = 1 << kSubtitleIndex, }; LiveSession( const sp<AMessage> ¬ify, uint32_t flags, const sp<IMediaHTTPService> &httpService); status_t dequeueAccessUnit(StreamType stream, sp<ABuffer> *accessUnit); status_t getStreamFormat(StreamType stream, sp<AMessage> *format); Loading Loading @@ -110,11 +112,13 @@ private: kWhatChangeConfiguration3 = 'chC3', kWhatFinishDisconnect2 = 'fin2', kWhatSwapped = 'swap', kWhatCheckSwitchDown = 'ckSD', kWhatSwitchDown = 'sDwn', kWhatPollBuffering = 'poll', }; static const size_t kBandwidthHistoryBytes; static const int64_t kHighWaterMark; static const int64_t kMidWaterMark; static const int64_t kLowWaterMark; struct BandwidthItem { size_t mPlaylistIndex; Loading Loading @@ -169,6 +173,7 @@ private: sp<M3UParser> mPlaylist; sp<ALooper> mFetcherLooper; KeyedVector<AString, FetcherInfo> mFetcherInfos; uint32_t mStreamMask; Loading @@ -181,7 +186,6 @@ private: // we use this to track reconfiguration progress. uint32_t mSwapMask; KeyedVector<StreamType, sp<AnotherPacketSource> > mDiscontinuities; KeyedVector<StreamType, sp<AnotherPacketSource> > mPacketSources; // A second set of packet sources that buffer content for the variant we're switching to. KeyedVector<StreamType, sp<AnotherPacketSource> > mPacketSources2; Loading Loading @@ -210,10 +214,11 @@ private: bool mFirstTimeUsValid; int64_t mFirstTimeUs; int64_t mLastSeekTimeUs; sp<AMessage> mSwitchDownMonitor; KeyedVector<size_t, int64_t> mDiscontinuityAbsStartTimesUs; KeyedVector<size_t, int64_t> mDiscontinuityOffsetTimesUs; int32_t mPollBufferingGeneration; sp<PlaylistFetcher> addFetcher(const char *uri); void onConnect(const sp<AMessage> &msg); Loading Loading @@ -257,27 +262,24 @@ private: void onChangeConfiguration2(const sp<AMessage> &msg); void onChangeConfiguration3(const sp<AMessage> &msg); void onSwapped(const sp<AMessage> &msg); void onCheckSwitchDown(); void onSwitchDown(); void tryToFinishBandwidthSwitch(); void scheduleCheckBandwidthEvent(); void cancelCheckBandwidthEvent(); // cancelBandwidthSwitch is atomic wrt swapPacketSource; call it to prevent packet sources // from being swapped out on stale discontinuities while manipulating // mPacketSources/mPacketSources2. void cancelBandwidthSwitch(); bool canSwitchBandwidthTo(size_t bandwidthIndex); void onCheckBandwidth(const sp<AMessage> &msg); void schedulePollBuffering(); void cancelPollBuffering(); void onPollBuffering(); bool checkBuffering(bool &low, bool &mid, bool &high); void switchBandwidthIfNeeded(bool canSwitchUp); void finishDisconnect(); void postPrepared(status_t err); void swapPacketSource(StreamType stream); bool canSwitchUp(); DISALLOW_EVIL_CONSTRUCTORS(LiveSession); }; Loading
media/libstagefright/httplive/PlaylistFetcher.cpp +15 −47 File changed.Preview size limit exceeded, changes collapsed. Show changes
media/libstagefright/httplive/PlaylistFetcher.h +1 −5 Original line number Diff line number Diff line Loading @@ -36,6 +36,7 @@ class String8; struct PlaylistFetcher : public AHandler { static const int64_t kMinBufferedDurationUs; static const int32_t kDownloadBlockSize; static const int64_t kFetcherResumeThreshold; enum { kWhatStarted, Loading @@ -43,7 +44,6 @@ struct PlaylistFetcher : public AHandler { kWhatStopped, kWhatError, kWhatDurationUpdate, kWhatTemporarilyDoneFetching, kWhatPrepared, kWhatPreparationFailed, kWhatStartedAt, Loading Loading @@ -212,10 +212,6 @@ private: void updateDuration(); // Before resuming a fetcher in onResume, check the remaining duration is longer than that // returned by resumeThreshold. int64_t resumeThreshold(const sp<AMessage> &msg); DISALLOW_EVIL_CONSTRUCTORS(PlaylistFetcher); }; Loading