Donate to e Foundation | Murena handsets with /e/OS | Own a part of Murena! Learn more

Commit 514d7af2 authored by Chong Zhang's avatar Chong Zhang Committed by Android (Google) Code Review
Browse files

Merge "HLS: allow pause/resume in the middle of a segment"

parents 0e442128 a48d3728
Loading
Loading
Loading
Loading
+239 −173
Original line number Diff line number Diff line
@@ -128,6 +128,7 @@ LiveSession::LiveSession(
      mInPreparationPhase(true),
      mHTTPDataSource(new MediaHTTP(mHTTPService->makeHTTPConnection())),
      mCurBandwidthIndex(-1),
      mLastBandwidthBps(-1ll),
      mBandwidthEstimator(new BandwidthEstimator()),
      mStreamMask(0),
      mNewStreamMask(0),
@@ -159,24 +160,6 @@ LiveSession::~LiveSession() {
    }
}

sp<ABuffer> LiveSession::createFormatChangeBuffer(bool swap) {
    ABuffer *discontinuity = new ABuffer(0);
    discontinuity->meta()->setInt32("discontinuity", ATSParser::DISCONTINUITY_FORMATCHANGE);
    discontinuity->meta()->setInt32("swapPacketSource", swap);
    discontinuity->meta()->setInt32("switchGeneration", mSwitchGeneration);
    discontinuity->meta()->setInt64("timeUs", -1);
    return discontinuity;
}

void LiveSession::swapPacketSource(StreamType stream) {
    sp<AnotherPacketSource> &aps = mPacketSources.editValueFor(stream);
    sp<AnotherPacketSource> &aps2 = mPacketSources2.editValueFor(stream);
    sp<AnotherPacketSource> tmp = aps;
    aps = aps2;
    aps2 = tmp;
    aps2->clear();
}

status_t LiveSession::dequeueAccessUnit(
        StreamType stream, sp<ABuffer> *accessUnit) {
    if (!(mStreamMask & stream)) {
@@ -189,57 +172,22 @@ status_t LiveSession::dequeueAccessUnit(
    sp<AnotherPacketSource> packetSource = mPacketSources.valueFor(stream);

    ssize_t idx = typeToIndex(stream);
    if (!packetSource->hasBufferAvailable(&finalResult)) {
        if (finalResult == OK) {
            return -EAGAIN;
        } else {
            return finalResult;
        }
    }

    // Do not let client pull data if we don't have format yet.
    // We might only have a format discontinuity queued without actual data.
    // Do not let client pull data if we don't have data packets yet.
    // We might only have a format discontinuity queued without data.
    // When NuPlayerDecoder dequeues the format discontinuity, it will
    // immediately try to getFormat. If we return NULL, NuPlayerDecoder
    // thinks it can do seamless change, so will not shutdown decoder.
    // When the actual format arrives, it can't handle it and get stuck.
    // TODO: We need a method to check if the packet source has any
    //       data packets available, dequeuing should only start then.
    sp<MetaData> format = packetSource->getFormat();
    if (format == NULL) {
    if (!packetSource->hasDataBufferAvailable(&finalResult)) {
        if (finalResult == OK) {
            return -EAGAIN;
        } else {
            return finalResult;
        }
    int32_t targetDuration = 0;
    sp<AMessage> meta = packetSource->getLatestEnqueuedMeta();
    if (meta != NULL) {
        meta->findInt32("targetDuration", &targetDuration);
    }

    int64_t targetDurationUs = targetDuration * 1000000ll;
    if (targetDurationUs == 0 ||
            targetDurationUs > PlaylistFetcher::kMinBufferedDurationUs) {
        // Fetchers limit buffering to
        // min(3 * targetDuration, kMinBufferedDurationUs)
        targetDurationUs = PlaylistFetcher::kMinBufferedDurationUs;
    }

    // wait for counterpart
    sp<AnotherPacketSource> otherSource;
    uint32_t mask = mNewStreamMask & mStreamMask;
    uint32_t fetchersMask  = 0;
    for (size_t i = 0; i < mFetcherInfos.size(); ++i) {
        uint32_t fetcherMask = mFetcherInfos.valueAt(i).mFetcher->getStreamTypeMask();
        fetchersMask |= fetcherMask;
    }
    mask &= fetchersMask;
    if (stream == STREAMTYPE_AUDIO && (mask & STREAMTYPE_VIDEO)) {
        otherSource = mPacketSources.valueFor(STREAMTYPE_VIDEO);
    } else if (stream == STREAMTYPE_VIDEO && (mask & STREAMTYPE_AUDIO)) {
        otherSource = mPacketSources.valueFor(STREAMTYPE_AUDIO);
    }
    if (otherSource != NULL && !otherSource->hasBufferAvailable(&finalResult)) {
        return finalResult == OK ? -EAGAIN : finalResult;
    }
    // Let the client dequeue as long as we have buffers available
    // Do not make pause/resume decisions here.

    status_t err = packetSource->dequeueAccessUnit(accessUnit);

@@ -278,21 +226,6 @@ status_t LiveSession::dequeueAccessUnit(
              type,
              extra == NULL ? "NULL" : extra->debugString().c_str());

        int32_t swap;
        if ((*accessUnit)->meta()->findInt32("swapPacketSource", &swap) && swap) {
            int32_t switchGeneration;
            CHECK((*accessUnit)->meta()->findInt32("switchGeneration", &switchGeneration));
            {
                Mutex::Autolock lock(mSwapMutex);
                if (switchGeneration == mSwitchGeneration) {
                    swapPacketSource(stream);
                    sp<AMessage> msg = new AMessage(kWhatSwapped, this);
                    msg->setInt32("stream", stream);
                    msg->setInt32("switchGeneration", switchGeneration);
                    msg->post();
                }
            }
        } else {
        size_t seq = strm.mCurDiscontinuitySeq;
        int64_t offsetTimeUs;
        if (mDiscontinuityOffsetTimesUs.indexOfKey(seq) >= 0) {
@@ -312,7 +245,6 @@ status_t LiveSession::dequeueAccessUnit(
        }

        mDiscontinuityOffsetTimesUs.add(seq, offsetTimeUs);
        }
    } else if (err == OK) {

        if (stream == STREAMTYPE_AUDIO || stream == STREAMTYPE_VIDEO) {
@@ -373,7 +305,6 @@ status_t LiveSession::dequeueAccessUnit(
}

status_t LiveSession::getStreamFormat(StreamType stream, sp<AMessage> *format) {
    // No swapPacketSource race condition; called from the same thread as dequeueAccessUnit.
    if (!(mStreamMask & stream)) {
        return UNKNOWN_ERROR;
    }
@@ -389,6 +320,10 @@ status_t LiveSession::getStreamFormat(StreamType stream, sp<AMessage> *format) {
    return convertMetaDataToMessage(meta, format);
}

sp<HTTPBase> LiveSession::getHTTPDataSource() {
    return new MediaHTTP(mHTTPService->makeHTTPConnection());
}

void LiveSession::connectAsync(
        const char *url, const KeyedVector<String8, String8> *headers) {
    sp<AMessage> msg = new AMessage(kWhatConnect, this);
@@ -468,21 +403,27 @@ void LiveSession::onMessageReceived(const sp<AMessage> &msg) {
                case PlaylistFetcher::kWhatPaused:
                case PlaylistFetcher::kWhatStopped:
                {
                    if (what == PlaylistFetcher::kWhatStopped) {
                    AString uri;
                    CHECK(msg->findString("uri", &uri));
                    ssize_t index = mFetcherInfos.indexOfKey(uri);
                    if (index < 0) {
                            // ignore duplicated kWhatStopped messages.
                        // ignore msgs from fetchers that's already gone
                        break;
                    }

                    if (what == PlaylistFetcher::kWhatStopped) {
                        tryToFinishBandwidthSwitch(uri);

                        mFetcherLooper->unregisterHandler(
                                mFetcherInfos[index].mFetcher->id());
                        mFetcherInfos.removeItemsAt(index);

                        if (mSwitchInProgress) {
                            tryToFinishBandwidthSwitch();
                    } else if (what == PlaylistFetcher::kWhatPaused) {
                        int32_t seekMode;
                        CHECK(msg->findInt32("seekMode", &seekMode));
                        for (size_t i = 0; i < kMaxStreams; ++i) {
                            if (mStreams[i].mUri == uri) {
                                mStreams[i].mSeekMode = (SeekMode) seekMode;
                            }
                        }
                    }

@@ -564,6 +505,13 @@ void LiveSession::onMessageReceived(const sp<AMessage> &msg) {
                        break;
                    }

                    AString uri;
                    CHECK(msg->findString("uri", &uri));
                    ssize_t index = mFetcherInfos.indexOfKey(uri);
                    if (index >= 0) {
                        mFetcherInfos.editValueAt(index).mToBeResumed = true;
                    }

                    // Resume fetcher for the original variant; the resumed fetcher should
                    // continue until the timestamps found in msg, which is stored by the
                    // new fetcher to indicate where the new variant has started buffering.
@@ -607,12 +555,6 @@ void LiveSession::onMessageReceived(const sp<AMessage> &msg) {
            break;
        }

        case kWhatSwapped:
        {
            onSwapped(msg);
            break;
        }

        case kWhatPollBuffering:
        {
            int32_t generation;
@@ -751,11 +693,10 @@ void LiveSession::onConnect(const sp<AMessage> &msg) {
}

void LiveSession::finishDisconnect() {
    ALOGV("finishDisconnect");

    // No reconfiguration is currently pending, make sure none will trigger
    // during disconnection either.

    // Protect mPacketSources from a swapPacketSource race condition through disconnect.
    // (finishDisconnect, onFinishDisconnect2)
    cancelBandwidthSwitch();

    // cancel buffer polling
@@ -805,8 +746,8 @@ sp<PlaylistFetcher> LiveSession::addFetcher(const char *uri) {
    FetcherInfo info;
    info.mFetcher = new PlaylistFetcher(notify, this, uri, mSubtitleGeneration);
    info.mDurationUs = -1ll;
    info.mIsPrepared = false;
    info.mToBeRemoved = false;
    info.mToBeResumed = false;
    mFetcherLooper->registerHandler(info.mFetcher);

    mFetcherInfos.add(uri, info);
@@ -835,14 +776,15 @@ ssize_t LiveSession::fetchFile(
        int64_t range_offset, int64_t range_length,
        uint32_t block_size, /* download block size */
        sp<DataSource> *source, /* to return and reuse source */
        String8 *actualUrl) {
        String8 *actualUrl,
        bool forceConnectHTTP /* force connect HTTP when resuing source */) {
    off64_t size;
    sp<DataSource> temp_source;
    if (source == NULL) {
        source = &temp_source;
    }

    if (*source == NULL) {
    if (*source == NULL || forceConnectHTTP) {
        if (!strncasecmp(url, "file://", 7)) {
            *source = new FileSource(url + 7);
        } else if (strncasecmp(url, "http://", 7)
@@ -861,15 +803,20 @@ ssize_t LiveSession::fetchFile(
                                    ? "" : AStringPrintf("%lld",
                                            range_offset + range_length - 1).c_str()).c_str()));
            }
            status_t err = mHTTPDataSource->connect(url, &headers);

            HTTPBase* httpDataSource =
                    (*source == NULL) ? mHTTPDataSource.get() : (HTTPBase*)source->get();
            status_t err = httpDataSource->connect(url, &headers);

            if (err != OK) {
                return err;
            }

            if (*source == NULL) {
                *source = mHTTPDataSource;
            }
        }
    }

    status_t getSizeErr = (*source)->getSize(&size);
    if (getSizeErr != OK) {
@@ -1003,6 +950,57 @@ static double uniformRand() {
}
#endif

float LiveSession::getAbortThreshold(
        ssize_t currentBWIndex, ssize_t targetBWIndex) const {
    float abortThreshold = -1.0f;
    if (currentBWIndex > 0 && targetBWIndex < currentBWIndex) {
        /*
           If we're switching down, we need to decide whether to

           1) finish last segment of high-bandwidth variant, or
           2) abort last segment of high-bandwidth variant, and fetch an
              overlapping portion from low-bandwidth variant.

           Here we try to maximize the amount of buffer left when the
           switch point is met. Given the following parameters:

           B: our current buffering level in seconds
           T: target duration in seconds
           X: sample duration in seconds remain to fetch in last segment
           bw0: bandwidth of old variant (as specified in playlist)
           bw1: bandwidth of new variant (as specified in playlist)
           bw: measured bandwidth available

           If we choose 1), when switch happens at the end of current
           segment, our buffering will be
                  B + X - X * bw0 / bw

           If we choose 2), when switch happens where we aborted current
           segment, our buffering will be
                  B - (T - X) * bw1 / bw

           We should only choose 1) if
                  X/T < bw1 / (bw1 + bw0 - bw)
        */

        CHECK(mLastBandwidthBps >= 0);
        abortThreshold =
                (float)mBandwidthItems.itemAt(targetBWIndex).mBandwidth
             / ((float)mBandwidthItems.itemAt(targetBWIndex).mBandwidth
              + (float)mBandwidthItems.itemAt(currentBWIndex).mBandwidth
              - (float)mLastBandwidthBps * 0.7f);
        if (abortThreshold < 0.0f) {
            abortThreshold = -1.0f; // do not abort
        }
        ALOGV("Switching Down: bps %ld => %ld, measured %d, abort ratio %.2f",
                mBandwidthItems.itemAt(currentBWIndex).mBandwidth,
                mBandwidthItems.itemAt(targetBWIndex).mBandwidth,
                mLastBandwidthBps,
                abortThreshold);
    }
    return abortThreshold;
}

void LiveSession::addBandwidthMeasurement(size_t numBytes, int64_t delayUs) {
    mBandwidthEstimator->addBandwidthMeasurement(numBytes, delayUs);
}
@@ -1210,8 +1208,6 @@ void LiveSession::changeConfiguration(
    CHECK(!mReconfigurationInProgress);
    mReconfigurationInProgress = true;

    mCurBandwidthIndex = bandwidthIndex;

    ALOGV("changeConfiguration => timeUs:%" PRId64 " us, bwIndex:%zu, pickTrack:%d",
          timeUs, bandwidthIndex, pickTrack);

@@ -1238,7 +1234,6 @@ void LiveSession::changeConfiguration(
        if (timeUs < 0ll) {
            // delay fetcher removal if not picking tracks
            discardFetcher = pickTrack;

        }

        for (size_t j = 0; j < kMaxStreams; ++j) {
@@ -1253,12 +1248,24 @@ void LiveSession::changeConfiguration(
        if (discardFetcher) {
            mFetcherInfos.valueAt(i).mFetcher->stopAsync();
        } else {
            // if we're seeking, pause immediately (no need to finish the segment)
            bool immediate = (timeUs >= 0ll);
            mFetcherInfos.valueAt(i).mFetcher->pauseAsync(immediate);
            float threshold = -1.0f; // always finish fetching by default
            if (timeUs >= 0ll) {
                // seeking, no need to finish fetching
                threshold = 0.0f;
            } else if (!pickTrack) {
                // adapting, abort if remaining of current segment is over threshold
                threshold = getAbortThreshold(
                        mCurBandwidthIndex, bandwidthIndex);
            }

            ALOGV("Pausing with threshold %.3f", threshold);

            mFetcherInfos.valueAt(i).mFetcher->pauseAsync(threshold);
        }
    }

    mCurBandwidthIndex = bandwidthIndex;

    sp<AMessage> msg;
    if (timeUs < 0ll) {
        // skip onChangeConfiguration2 (decoder destruction) if not seeking.
@@ -1451,7 +1458,6 @@ void LiveSession::onChangeConfiguration3(const sp<AMessage> &msg) {
    }

    // streamMask now only contains the types that need a new fetcher created.

    if (streamMask != 0) {
        ALOGV("creating new fetchers for mask 0x%08x", streamMask);
    }
@@ -1472,6 +1478,7 @@ void LiveSession::onChangeConfiguration3(const sp<AMessage> &msg) {
        int64_t startTimeUs = -1;
        int64_t segmentStartTimeUs = -1ll;
        int32_t discontinuitySeq = -1;
        SeekMode seekMode = kSeekModeExactPosition;
        sp<AnotherPacketSource> sources[kMaxStreams];

        if (i == kSubtitleIndex) {
@@ -1491,6 +1498,16 @@ void LiveSession::onChangeConfiguration3(const sp<AMessage> &msg) {
                    sp<AMessage> meta;
                    if (pickTrack) {
                        // selecting

                        // FIXME:
                        // This should only apply to the track that's being picked, we
                        // need a bitmask to indicate that.
                        //
                        // It's possible that selectTrack() gets called during a bandwidth
                        // switch, and we needed to fetch a new variant. The new fetcher
                        // should start from where old fetcher left off, not where decoder
                        // is dequeueing at.

                        meta = sources[j]->getLatestDequeuedMeta();
                    } else {
                        // adapting
@@ -1527,14 +1544,22 @@ void LiveSession::onChangeConfiguration3(const sp<AMessage> &msg) {
                        ALOGV("stream[%zu]: queue format change", j);

                        sources[j]->queueDiscontinuity(
                                ATSParser::DISCONTINUITY_FORMATCHANGE, NULL, true);
                                ATSParser::DISCONTINUITY_FORMAT_ONLY, NULL, true);
                    } else {
                        // adapting, queue discontinuities after resume
                        sources[j] = mPacketSources2.valueFor(indexToType(j));
                        sources[j]->clear();
                        uint32_t extraStreams = mNewStreamMask & (~mStreamMask);
                        if (extraStreams & indexToType(j)) {
                            sources[j]->queueAccessUnit(createFormatChangeBuffer(/*swap*/ false));
                            sources[j]->queueDiscontinuity(
                                ATSParser::DISCONTINUITY_FORMAT_ONLY, NULL, true);
                        }
                        // the new fetcher might be providing streams that used to be
                        // provided by two different fetchers,  if one of the fetcher
                        // paused in the middle while the other somehow paused in next
                        // seg, we have to start from next seg.
                        if (seekMode < mStreams[j].mSeekMode) {
                            seekMode = mStreams[j].mSeekMode;
                        }
                    }
                }
@@ -1550,7 +1575,7 @@ void LiveSession::onChangeConfiguration3(const sp<AMessage> &msg) {
                startTimeUs < 0 ? mLastSeekTimeUs : startTimeUs,
                segmentStartTimeUs,
                discontinuitySeq,
                switching);
                seekMode);
    }

    // All fetchers have now been started, the configuration change
@@ -1569,25 +1594,57 @@ void LiveSession::onChangeConfiguration3(const sp<AMessage> &msg) {
    }
}

void LiveSession::onSwapped(const sp<AMessage> &msg) {
    int32_t switchGeneration;
    CHECK(msg->findInt32("switchGeneration", &switchGeneration));
    if (switchGeneration != mSwitchGeneration) {
void LiveSession::swapPacketSource(StreamType stream) {
    ALOGV("swapPacketSource: stream = %d", stream);

    // transfer packets from source2 to source
    sp<AnotherPacketSource> &aps = mPacketSources.editValueFor(stream);
    sp<AnotherPacketSource> &aps2 = mPacketSources2.editValueFor(stream);

    // queue discontinuity in mPacketSource
    aps->queueDiscontinuity(ATSParser::DISCONTINUITY_FORMAT_ONLY, NULL, false);

    // queue packets in mPacketSource2 to mPacketSource
    status_t finalResult = OK;
    sp<ABuffer> accessUnit;
    while (aps2->hasBufferAvailable(&finalResult) && finalResult == OK &&
          OK == aps2->dequeueAccessUnit(&accessUnit)) {
        aps->queueAccessUnit(accessUnit);
    }
    aps2->clear();
}

void LiveSession::tryToFinishBandwidthSwitch(const AString &uri) {
    if (!mSwitchInProgress) {
        return;
    }

    ssize_t index = mFetcherInfos.indexOfKey(uri);
    if (index < 0 || !mFetcherInfos[index].mToBeRemoved) {
        return;
    }

    int32_t stream;
    CHECK(msg->findInt32("stream", &stream));
    // Swap packet source of streams provided by old variant
    for (size_t idx = 0; idx < kMaxStreams; idx++) {
        if (uri == mStreams[idx].mUri) {
            StreamType stream = indexToType(idx);

            swapPacketSource(stream);

    ssize_t idx = typeToIndex(stream);
    CHECK(idx >= 0);
            if ((mNewStreamMask & stream) && mStreams[idx].mNewUri.empty()) {
        ALOGW("swapping stream type %d %s to empty stream", stream, mStreams[idx].mUri.c_str());
                ALOGW("swapping stream type %d %s to empty stream",
                        stream, mStreams[idx].mUri.c_str());
            }
            mStreams[idx].mUri = mStreams[idx].mNewUri;
            mStreams[idx].mNewUri.clear();

            mSwapMask &= ~stream;
        }
    }

    mFetcherInfos.editValueAt(index).mToBeRemoved = false;

    ALOGV("tryToFinishBandwidthSwitch: mSwapMask=%x", mSwapMask);
    if (mSwapMask != 0) {
        return;
    }
@@ -1595,21 +1652,50 @@ void LiveSession::onSwapped(const sp<AMessage> &msg) {
    // Check if new variant contains extra streams.
    uint32_t extraStreams = mNewStreamMask & (~mStreamMask);
    while (extraStreams) {
        StreamType extraStream = (StreamType) (extraStreams & ~(extraStreams - 1));
        swapPacketSource(extraStream);
        extraStreams &= ~extraStream;
        StreamType stream = (StreamType) (extraStreams & ~(extraStreams - 1));
        extraStreams &= ~stream;

        idx = typeToIndex(extraStream);
        swapPacketSource(stream);

        ssize_t idx = typeToIndex(stream);
        CHECK(idx >= 0);
        if (mStreams[idx].mNewUri.empty()) {
            ALOGW("swapping extra stream type %d %s to empty stream",
                    extraStream, mStreams[idx].mUri.c_str());
                    stream, mStreams[idx].mUri.c_str());
        }
        mStreams[idx].mUri = mStreams[idx].mNewUri;
        mStreams[idx].mNewUri.clear();
    }

    tryToFinishBandwidthSwitch();
    // Restart new fetcher (it was paused after the first 47k block)
    // and let it fetch into mPacketSources (not mPacketSources2)
    for (size_t i = 0; i < mFetcherInfos.size(); ++i) {
        FetcherInfo &info = mFetcherInfos.editValueAt(i);
        if (info.mToBeResumed) {
            const AString &uri = mFetcherInfos.keyAt(i);
            sp<AnotherPacketSource> sources[kMaxStreams];
            for (size_t j = 0; j < kMaxStreams; ++j) {
                if (uri == mStreams[j].mUri) {
                    sources[j] = mPacketSources.valueFor(indexToType(j));
                }
            }
            if (sources[kAudioIndex] != NULL
                    || sources[kVideoIndex] != NULL
                    || sources[kSubtitleIndex] != NULL) {
                ALOGV("resuming fetcher %s", uri.c_str());
                info.mFetcher->startAsync(
                        sources[kAudioIndex],
                        sources[kVideoIndex],
                        sources[kSubtitleIndex]);
            }
            info.mToBeResumed = false;
        }
    }

    mStreamMask = mNewStreamMask;
    mSwitchInProgress = false;

    ALOGI("#### Finished Bandwidth Switch");
}

void LiveSession::schedulePollBuffering() {
@@ -1624,7 +1710,7 @@ void LiveSession::cancelPollBuffering() {

void LiveSession::onPollBuffering() {
    ALOGV("onPollBuffering: mSwitchInProgress %d, mReconfigurationInProgress %d, "
            "mInPreparationPhase %d, mCurBandwidthIndex %d, mStreamMask 0x%x",
            "mInPreparationPhase %d, mCurBandwidthIndex %zd, mStreamMask 0x%x",
        mSwitchInProgress, mReconfigurationInProgress,
        mInPreparationPhase, mCurBandwidthIndex, mStreamMask);

@@ -1643,30 +1729,9 @@ void LiveSession::onPollBuffering() {
    schedulePollBuffering();
}

// Mark switch done when:
//   1. all old buffers are swapped out
void LiveSession::tryToFinishBandwidthSwitch() {
    if (!mSwitchInProgress) {
        return;
    }

    bool needToRemoveFetchers = false;
    for (size_t i = 0; i < mFetcherInfos.size(); ++i) {
        if (mFetcherInfos.valueAt(i).mToBeRemoved) {
            needToRemoveFetchers = true;
            break;
        }
    }

    if (!needToRemoveFetchers && mSwapMask == 0) {
        ALOGI("mSwitchInProgress = false");
        mStreamMask = mNewStreamMask;
        mSwitchInProgress = false;
    }
}

void LiveSession::cancelBandwidthSwitch() {
    Mutex::Autolock lock(mSwapMutex);
    ALOGV("cancelBandwidthSwitch: mSwitchGen(%d)++", mSwitchGeneration);

    mSwitchGeneration++;
    mSwitchInProgress = false;
    mSwapMask = 0;
@@ -1760,6 +1825,7 @@ void LiveSession::switchBandwidthIfNeeded(bool bufferHigh, bool bufferLow) {
    int32_t bandwidthBps;
    if (mBandwidthEstimator->estimateBandwidth(&bandwidthBps)) {
        ALOGV("bandwidth estimated at %.2f kbps", bandwidthBps / 1024.0f);
        mLastBandwidthBps = bandwidthBps;
    } else {
        ALOGV("no bandwidth estimate.");
        return;
@@ -1778,7 +1844,7 @@ void LiveSession::switchBandwidthIfNeeded(bool bufferHigh, bool bufferLow) {
            return;
        }

        ALOGI("#### Initiate Bandwidth Switch: %d => %d",
        ALOGI("#### Starting Bandwidth Switch: %zd => %zd",
                mCurBandwidthIndex, bandwidthIndex);
        changeConfiguration(-1, bandwidthIndex, false);
    }
+20 −24

File changed.

Preview size limit exceeded, changes collapsed.

+283 −71

File changed.

Preview size limit exceeded, changes collapsed.

+21 −9
Original line number Diff line number Diff line
@@ -65,9 +65,9 @@ struct PlaylistFetcher : public AHandler {
            int64_t segmentStartTimeUs = -1ll, // starting position within playlist
            // startTimeUs!=segmentStartTimeUs only when playlist is live
            int32_t startDiscontinuitySeq = 0,
            bool adaptive = false);
            LiveSession::SeekMode seekMode = LiveSession::kSeekModeExactPosition);

    void pauseAsync(bool immediate = false);
    void pauseAsync(float thresholdRatio);

    void stopAsync(bool clear = true);

@@ -95,6 +95,8 @@ private:
        kWhatDownloadNext   = 'dlnx',
    };

    struct DownloadState;

    static const int64_t kMaxMonitorDelayUs;
    static const int32_t kNumSkipFrames;

@@ -105,6 +107,7 @@ private:
    sp<AMessage> mNotify;
    sp<AMessage> mStartTimeUsNotify;

    sp<HTTPBase> mHTTPDataSource;
    sp<LiveSession> mSession;
    AString mURI;

@@ -131,7 +134,7 @@ private:
    int32_t mNumRetries;
    bool mStartup;
    bool mIDRFound;
    bool mAdaptive;
    int32_t mSeekMode;
    bool mPrepared;
    bool mTimeChangeSignaled;
    int64_t mNextPTSTimeUs;
@@ -141,9 +144,6 @@ private:

    int32_t mLastDiscontinuitySeq;

    Mutex mStoppingLock;
    bool mStopping;

    enum RefreshState {
        INITIAL_MINIMUM_RELOAD_DELAY,
        FIRST_UNCHANGED_RELOAD_ATTEMPT,
@@ -157,8 +157,8 @@ private:
    sp<ATSParser> mTSParser;

    bool mFirstPTSValid;
    uint64_t mFirstPTS;
    int64_t mFirstTimeUs;
    int64_t mSegmentFirstPTS;
    sp<AnotherPacketSource> mVideoBuffer;

    // Stores the initialization vector to decrypt the next block of cipher text, which can
@@ -166,6 +166,11 @@ private:
    // the last block of cipher text (cipher-block chaining).
    unsigned char mAESInitVec[16];

    Mutex mThresholdLock;
    float mThresholdRatio;

    sp<DownloadState> mDownloadState;

    // Set first to true if decrypting the first segment of a playlist segment. When
    // first is true, reset the initialization vector based on the available
    // information in the manifest; otherwise, use the initialization vector as
@@ -181,7 +186,8 @@ private:

    void postMonitorQueue(int64_t delayUs = 0, int64_t minDelayUs = 0);
    void cancelMonitorQueue();
    void setStopping(bool stopping);
    void setStoppingThreshold(float thresholdRatio);
    bool shouldPauseDownload(bool startFound);

    int64_t delayUsToRefreshPlaylist() const;
    status_t refreshPlaylist();
@@ -195,6 +201,11 @@ private:
    void onStop(const sp<AMessage> &msg);
    void onMonitorQueue();
    void onDownloadNext();
    bool initDownloadState(
            AString &uri,
            sp<AMessage> &itemMeta,
            int32_t &firstSeqNumberInPlaylist,
            int32_t &lastSeqNumberInPlaylist);

    // Resume a fetcher to continue until the stopping point stored in msg.
    status_t onResumeUntil(const sp<AMessage> &msg);
@@ -213,7 +224,8 @@ private:
    void queueDiscontinuity(
            ATSParser::DiscontinuityType type, const sp<AMessage> &extra);

    int32_t getSeqNumberWithAnchorTime(int64_t anchorTimeUs) const;
    int32_t getSeqNumberWithAnchorTime(
            int64_t anchorTimeUs, int64_t targetDurationUs) const;
    int32_t getSeqNumberForDiscontinuity(size_t discontinuitySeq) const;
    int32_t getSeqNumberForTime(int64_t timeUs) const;

+3 −0
Original line number Diff line number Diff line
@@ -46,6 +46,9 @@ struct ATSParser : public RefBase {
            DISCONTINUITY_AUDIO_FORMAT
                | DISCONTINUITY_VIDEO_FORMAT
                | DISCONTINUITY_TIME,
        DISCONTINUITY_FORMAT_ONLY       =
            DISCONTINUITY_AUDIO_FORMAT
                | DISCONTINUITY_VIDEO_FORMAT,
    };

    enum Flags {
Loading