Donate to e Foundation | Murena handsets with /e/OS | Own a part of Murena! Learn more

Commit c4547ba7 authored by Chong Zhang's avatar Chong Zhang
Browse files

HLS: some fixes for seek with discontinuity.

- keep old fetcher when seeking, unless the URI is changing.

- when restarting after a seek, check discontinuity seq, and
  queue format change if it's changed.

- add a simple kill switch to abort when stop (or pause for seek).

- when seeking, if searching for start time goes into 2nd segment,
  do not signal time discontinuity or reset first PTS.

- use setFormat() to set format in AnotherPacketSource, otherwise
  video/audio flags are not updated and format are not cleared on
  discontinuities.

- do not start queueing video access unit until first IDR after start

bug: 19656539

Change-Id: I79108d26964f59ea00d2eeac8f5f9318747f8541
parent c84eb736
Loading
Loading
Loading
Loading
+45 −42
Original line number Diff line number Diff line
@@ -76,8 +76,6 @@ LiveSession::LiveSession(
      mRealTimeBaseUs(0ll),
      mReconfigurationInProgress(false),
      mSwitchInProgress(false),
      mDisconnectReplyID(0),
      mSeekReplyID(0),
      mFirstTimeUsValid(false),
      mFirstTimeUs(0),
      mLastSeekTimeUs(0),
@@ -90,7 +88,6 @@ LiveSession::LiveSession(
    for (size_t i = 0; i < kMaxStreams; ++i) {
        mPacketSources.add(indexToType(i), new AnotherPacketSource(NULL /* meta */));
        mPacketSources2.add(indexToType(i), new AnotherPacketSource(NULL /* meta */));
        mBuffering[i] = false;
    }

    size_t numHistoryItems = kBandwidthHistoryBytes /
@@ -139,13 +136,24 @@ status_t LiveSession::dequeueAccessUnit(
    ssize_t idx = typeToIndex(stream);
    if (!packetSource->hasBufferAvailable(&finalResult)) {
        if (finalResult == OK) {
            mBuffering[idx] = true;
            return -EAGAIN;
        } else {
            return finalResult;
        }
    }

    // Do not let client pull data if we don't have format yet.
    // We might only have a format discontinuity queued without actual data.
    // When NuPlayerDecoder dequeues the format discontinuity, it will
    // immediately try to getFormat. If we return NULL, NuPlayerDecoder
    // thinks it can do seamless change, so will not shutdown decoder.
    // When the actual format arrives, it can't handle it and get stuck.
    // TODO: We need a method to check if the packet source has any
    //       data packets available, dequeuing should only start then.
    sp<MetaData> format = packetSource->getFormat();
    if (format == NULL) {
        return -EAGAIN;
    }
    int32_t targetDuration = 0;
    sp<AMessage> meta = packetSource->getLatestEnqueuedMeta();
    if (meta != NULL) {
@@ -160,18 +168,6 @@ status_t LiveSession::dequeueAccessUnit(
        targetDurationUs = PlaylistFetcher::kMinBufferedDurationUs;
    }

    if (mBuffering[idx]) {
        if (mSwitchInProgress
                || packetSource->isFinished(0)
                || packetSource->hasBufferAvailable(&finalResult)) {
            mBuffering[idx] = false;
        }
    }

    if (mBuffering[idx]) {
        return -EAGAIN;
    }

    // wait for counterpart
    sp<AnotherPacketSource> otherSource;
    uint32_t mask = mNewStreamMask & mStreamMask;
@@ -737,7 +733,7 @@ void LiveSession::onFinishDisconnect2() {
    response->setInt32("err", OK);

    response->postReply(mDisconnectReplyID);
    mDisconnectReplyID = 0;
    mDisconnectReplyID.clear();
}

sp<PlaylistFetcher> LiveSession::addFetcher(const char *uri) {
@@ -1192,11 +1188,12 @@ void LiveSession::changeConfiguration(

        bool discardFetcher = true;

        // If we're seeking all current fetchers are discarded.
        if (timeUs < 0ll) {
            // delay fetcher removal if not picking tracks
            discardFetcher = pickTrack;

        }

        for (size_t j = 0; j < kMaxStreams; ++j) {
            StreamType type = indexToType(j);
            if ((streamMask & type) && uri == URIs[j]) {
@@ -1205,12 +1202,13 @@ void LiveSession::changeConfiguration(
                discardFetcher = false;
            }
        }
        }

        if (discardFetcher) {
            mFetcherInfos.valueAt(i).mFetcher->stopAsync();
        } else {
            mFetcherInfos.valueAt(i).mFetcher->pauseAsync();
            // if we're seeking, pause immediately (no need to finish the segment)
            bool immediate = (timeUs >= 0ll);
            mFetcherInfos.valueAt(i).mFetcher->pauseAsync(immediate);
        }
    }

@@ -1274,11 +1272,11 @@ void LiveSession::onChangeConfiguration2(const sp<AMessage> &msg) {
        mDiscontinuityOffsetTimesUs.clear();
        mDiscontinuityAbsStartTimesUs.clear();

        if (mSeekReplyID != 0) {
        if (mSeekReplyID != NULL) {
            CHECK(mSeekReply != NULL);
            mSeekReply->setInt32("err", OK);
            mSeekReply->postReply(mSeekReplyID);
            mSeekReplyID = 0;
            mSeekReplyID.clear();
            mSeekReply.clear();
        }
    }
@@ -1287,9 +1285,6 @@ void LiveSession::onChangeConfiguration2(const sp<AMessage> &msg) {
    CHECK(msg->findInt32("streamMask", (int32_t *)&streamMask));
    CHECK(msg->findInt32("resumeMask", (int32_t *)&resumeMask));

    // currently onChangeConfiguration2 is only called for seeking;
    // remove the following CHECK if using it else where.
    CHECK_EQ(resumeMask, 0);
    streamMask |= resumeMask;

    AString URIs[kMaxStreams];
@@ -1301,17 +1296,25 @@ void LiveSession::onChangeConfiguration2(const sp<AMessage> &msg) {
        }
    }

    // Determine which decoders to shutdown on the player side,
    // a decoder has to be shutdown if either
    // 1) its streamtype was active before but now longer isn't.
    // or
    // 2) its streamtype was already active and still is but the URI
    //    has changed.
    uint32_t changedMask = 0;
    for (size_t i = 0; i < kMaxStreams && i != kSubtitleIndex; ++i) {
        if (((mStreamMask & streamMask & indexToType(i))
                && !(URIs[i] == mStreams[i].mUri))
                || (mStreamMask & ~streamMask & indexToType(i))) {
        // stream URI could change even if onChangeConfiguration2 is only
        // used for seek. Seek could happen during a bw switch, in this
        // case bw switch will be cancelled, but the seekTo position will
        // fetch from the new URI.
        if ((mStreamMask & streamMask & indexToType(i))
                && !mStreams[i].mUri.empty()
                && !(URIs[i] == mStreams[i].mUri)) {
            ALOGV("stream %d changed: oldURI %s, newURI %s", i,
                    mStreams[i].mUri.c_str(), URIs[i].c_str());
            sp<AnotherPacketSource> source = mPacketSources.valueFor(indexToType(i));
            source->queueDiscontinuity(
                    ATSParser::DISCONTINUITY_FORMATCHANGE, NULL, true);
        }
        // Determine which decoders to shutdown on the player side,
        // a decoder has to be shutdown if its streamtype was active
        // before but now longer isn't.
        if ((mStreamMask & ~streamMask & indexToType(i))) {
            changedMask |= indexToType(i);
        }
    }
@@ -1394,7 +1397,7 @@ void LiveSession::onChangeConfiguration3(const sp<AMessage> &msg) {
        if (sources[kAudioIndex] != NULL || sources[kVideoIndex] != NULL
                || sources[kSubtitleIndex] != NULL) {
            info.mFetcher->startAsync(
                    sources[kAudioIndex], sources[kVideoIndex], sources[kSubtitleIndex]);
                    sources[kAudioIndex], sources[kVideoIndex], sources[kSubtitleIndex], timeUs);
        } else {
            info.mToBeRemoved = true;
        }
@@ -1514,7 +1517,7 @@ void LiveSession::onChangeConfiguration3(const sp<AMessage> &msg) {
        mStreamMask = mNewStreamMask;
    }

    if (mDisconnectReplyID != 0) {
    if (mDisconnectReplyID != NULL) {
        finishDisconnect();
    }
}
+6 −0
Original line number Diff line number Diff line
@@ -251,6 +251,7 @@ M3UParser::M3UParser(
      mIsComplete(false),
      mIsEvent(false),
      mDiscontinuitySeq(0),
      mDiscontinuityCount(0),
      mSelectedIndex(-1) {
    mInitCheck = parse(data, size);
}
@@ -582,6 +583,7 @@ status_t M3UParser::parse(const void *_data, size_t size) {
                    itemMeta = new AMessage;
                }
                itemMeta->setInt32("discontinuity", true);
                ++mDiscontinuityCount;
            } else if (line.startsWith("#EXT-X-STREAM-INF")) {
                if (mMeta != NULL) {
                    return ERROR_MALFORMED;
@@ -609,6 +611,9 @@ status_t M3UParser::parse(const void *_data, size_t size) {
            } else if (line.startsWith("#EXT-X-MEDIA")) {
                err = parseMedia(line);
            } else if (line.startsWith("#EXT-X-DISCONTINUITY-SEQUENCE")) {
                if (mIsVariantPlaylist) {
                    return ERROR_MALFORMED;
                }
                size_t seq;
                err = parseDiscontinuitySequence(line, &seq);
                if (err == OK) {
@@ -628,6 +633,7 @@ status_t M3UParser::parse(const void *_data, size_t size) {
                        || !itemMeta->findInt64("durationUs", &durationUs)) {
                    return ERROR_MALFORMED;
                }
                itemMeta->setInt32("discontinuity-sequence", mDiscontinuitySeq + mDiscontinuityCount);
            }

            mItems.push();
+1 −0
Original line number Diff line number Diff line
@@ -70,6 +70,7 @@ private:
    bool mIsComplete;
    bool mIsEvent;
    size_t mDiscontinuitySeq;
    int32_t mDiscontinuityCount;

    sp<AMessage> mMeta;
    Vector<Item> mItems;
+104 −56
Original line number Diff line number Diff line
@@ -60,7 +60,6 @@ PlaylistFetcher::PlaylistFetcher(
        const char *uri,
        int32_t subtitleGeneration)
    : mNotify(notify),
      mStartTimeUsNotify(notify->dup()),
      mSession(session),
      mURI(uri),
      mStreamTypeMask(0),
@@ -74,16 +73,16 @@ PlaylistFetcher::PlaylistFetcher(
      mStartup(true),
      mAdaptive(false),
      mPrepared(false),
      mTimeChangeSignaled(false),
      mNextPTSTimeUs(-1ll),
      mMonitorQueueGeneration(0),
      mSubtitleGeneration(subtitleGeneration),
      mLastDiscontinuitySeq(-1ll),
      mStopping(false),
      mRefreshState(INITIAL_MINIMUM_RELOAD_DELAY),
      mFirstPTSValid(false),
      mAbsoluteTimeAnchorUs(0ll),
      mVideoBuffer(new AnotherPacketSource(NULL)) {
    memset(mPlaylistHash, 0, sizeof(mPlaylistHash));
    mStartTimeUsNotify->setInt32("what", kWhatStartedAt);
    mStartTimeUsNotify->setInt32("streamMask", 0);
}

PlaylistFetcher::~PlaylistFetcher() {
@@ -335,6 +334,11 @@ void PlaylistFetcher::cancelMonitorQueue() {
    ++mMonitorQueueGeneration;
}

void PlaylistFetcher::setStopping(bool stopping) {
    AutoMutex _l(mStoppingLock);
    mStopping = stopping;
}

void PlaylistFetcher::startAsync(
        const sp<AnotherPacketSource> &audioSource,
        const sp<AnotherPacketSource> &videoSource,
@@ -370,11 +374,16 @@ void PlaylistFetcher::startAsync(
    msg->post();
}

void PlaylistFetcher::pauseAsync() {
void PlaylistFetcher::pauseAsync(bool immediate) {
    if (immediate) {
        setStopping(true);
    }
    (new AMessage(kWhatPause, this))->post();
}

void PlaylistFetcher::stopAsync(bool clear) {
    setStopping(true);

    sp<AMessage> msg = new AMessage(kWhatStop, this);
    msg->setInt32("clear", clear);
    msg->post();
@@ -451,6 +460,10 @@ void PlaylistFetcher::onMessageReceived(const sp<AMessage> &msg) {

status_t PlaylistFetcher::onStart(const sp<AMessage> &msg) {
    mPacketSources.clear();
    mStopParams.clear();
    mStartTimeUsNotify = mNotify->dup();
    mStartTimeUsNotify->setInt32("what", kWhatStartedAt);
    mStartTimeUsNotify->setInt32("streamMask", 0);

    uint32_t streamTypeMask;
    CHECK(msg->findInt32("streamTypeMask", (int32_t *)&streamTypeMask));
@@ -496,12 +509,18 @@ status_t PlaylistFetcher::onStart(const sp<AMessage> &msg) {
    mSegmentStartTimeUs = segmentStartTimeUs;
    mDiscontinuitySeq = startDiscontinuitySeq;

    mRefreshState = INITIAL_MINIMUM_RELOAD_DELAY;

    if (startTimeUs >= 0) {
        mStartTimeUs = startTimeUs;
        mFirstPTSValid = false;
        mSeqNumber = -1;
        mStartup = true;
        mPrepared = false;
        mIDRFound = false;
        mTimeChangeSignaled = false;
        mAdaptive = adaptive;
        mVideoBuffer->clear();
    }

    postMonitorQueue();
@@ -511,6 +530,9 @@ status_t PlaylistFetcher::onStart(const sp<AMessage> &msg) {

void PlaylistFetcher::onPause() {
    cancelMonitorQueue();
    mLastDiscontinuitySeq = mDiscontinuitySeq;

    setStopping(false);
}

void PlaylistFetcher::onStop(const sp<AMessage> &msg) {
@@ -527,6 +549,8 @@ void PlaylistFetcher::onStop(const sp<AMessage> &msg) {

    mPacketSources.clear();
    mStreamTypeMask = 0;

    setStopping(false);
}

// Resume until we have reached the boundary timestamps listed in `msg`; when
@@ -624,11 +648,7 @@ void PlaylistFetcher::onMonitorQueue() {
        targetDurationUs = targetDurationSecs * 1000000ll;
    }

    // buffer at least 3 times the target duration, or up to 10 seconds
    int64_t durationToBufferUs = targetDurationUs * 3;
    if (durationToBufferUs > kMinBufferedDurationUs)  {
        durationToBufferUs = kMinBufferedDurationUs;
    }
    int64_t durationToBufferUs = kMinBufferedDurationUs;

    int64_t bufferedDurationUs = 0ll;
    status_t finalResult = NOT_ENOUGH_DATA;
@@ -874,11 +894,22 @@ void PlaylistFetcher::onDownloadNext() {
                &uri,
                &itemMeta));

    CHECK(itemMeta->findInt32("discontinuity-sequence", &mDiscontinuitySeq));

    int32_t val;
    if (itemMeta->findInt32("discontinuity", &val) && val != 0) {
        mDiscontinuitySeq++;
        discontinuity = true;
    } else if (mLastDiscontinuitySeq >= 0
            && mDiscontinuitySeq != mLastDiscontinuitySeq) {
        // Seek jumped to a new discontinuity sequence. We need to signal
        // a format change to decoder. Decoder needs to shutdown and be
        // created again if seamless format change is unsupported.
        ALOGV("saw discontinuity: mStartup %d, mLastDiscontinuitySeq %d, "
                "mDiscontinuitySeq %d, mStartTimeUs %lld",
            mStartup, mLastDiscontinuitySeq, mDiscontinuitySeq, (long long)mStartTimeUs);
        discontinuity = true;
    }
    mLastDiscontinuitySeq = -1;

    int64_t range_offset, range_length;
    if (!itemMeta->findInt64("range-offset", &range_offset)
@@ -907,8 +938,52 @@ void PlaylistFetcher::onDownloadNext() {
        }
    }

    if ((mStartup && !mTimeChangeSignaled) || discontinuity) {
        // We need to signal a time discontinuity to ATSParser on the
        // first segment after start, or on a discontinuity segment.
        // Setting mNextPTSTimeUs informs extractAndQueueAccessUnitsXX()
        // to send the time discontinuity.
        if (mPlaylist->isComplete() || mPlaylist->isEvent()) {
            // If this was a live event this made no sense since
            // we don't have access to all the segment before the current
            // one.
            mNextPTSTimeUs = getSegmentStartTimeUs(mSeqNumber);
        }

        // Setting mTimeChangeSignaled to true, so that if start time
        // searching goes into 2nd segment (without a discontinuity),
        // we don't reset time again. It causes corruption when pending
        // data in ATSParser is cleared.
        mTimeChangeSignaled = true;
    }

    if (discontinuity) {
        ALOGI("queueing discontinuity (explicit=%d)", discontinuity);

        // Signal a format discontinuity to ATSParser to clear partial data
        // from previous streams. Not doing this causes bitstream corruption.
        mTSParser->signalDiscontinuity(
                ATSParser::DISCONTINUITY_FORMATCHANGE, NULL /* extra */);

        queueDiscontinuity(
                ATSParser::DISCONTINUITY_FORMATCHANGE,
                NULL /* extra */);

        if (mStartup && mStartTimeUsRelative && mFirstPTSValid) {
            // This means we guessed mStartTimeUs to be in the previous
            // segment (likely very close to the end), but either video or
            // audio has not found start by the end of that segment.
            //
            // If this new segment is not a discontinuity, keep searching.
            //
            // If this new segment even got a discontinuity marker, just
            // set mStartTimeUs=0, and take all samples from now on.
            mStartTimeUs = 0;
            mFirstPTSValid = false;
        }
    }

    // block-wise download
    bool startup = mStartup;
    ssize_t bytesRead;
    do {
        bytesRead = mSession->fetchFile(
@@ -938,29 +1013,6 @@ void PlaylistFetcher::onDownloadNext() {
            return;
        }

        if (startup || discontinuity) {
            // Signal discontinuity.

            if (mPlaylist->isComplete() || mPlaylist->isEvent()) {
                // If this was a live event this made no sense since
                // we don't have access to all the segment before the current
                // one.
                mNextPTSTimeUs = getSegmentStartTimeUs(mSeqNumber);
            }

            if (discontinuity) {
                ALOGI("queueing discontinuity (explicit=%d)", discontinuity);

                queueDiscontinuity(
                        ATSParser::DISCONTINUITY_FORMATCHANGE,
                        NULL /* extra */);

                discontinuity = false;
            }

            startup = false;
        }

        err = OK;
        if (bufferStartsWithTsSyncByte(buffer)) {
            // Incremental extraction is only supported for MPEG2 transport streams.
@@ -995,7 +1047,7 @@ void PlaylistFetcher::onDownloadNext() {
            return;
        }

    } while (bytesRead != 0);
    } while (bytesRead != 0 && !mStopping);

    if (bufferStartsWithTsSyncByte(buffer)) {
        // If we don't see a stream in the program table after fetching a full ts segment
@@ -1197,9 +1249,7 @@ status_t PlaylistFetcher::extractAndQueueAccessUnitsFromTs(const sp<ABuffer> &bu
        mTSParser->signalDiscontinuity(
                ATSParser::DISCONTINUITY_TIME, extra);

        mAbsoluteTimeAnchorUs = mNextPTSTimeUs;
        mNextPTSTimeUs = -1ll;
        mFirstPTSValid = false;
    }

    size_t offset = 0;
@@ -1252,12 +1302,17 @@ status_t PlaylistFetcher::extractAndQueueAccessUnitsFromTs(const sp<ABuffer> &bu
            continue;
        }

        int64_t timeUs;
        const char *mime;
        sp<MetaData> format  = source->getFormat();
        bool isAvc = format != NULL && format->findCString(kKeyMIMEType, &mime)
                && !strcasecmp(mime, MEDIA_MIMETYPE_VIDEO_AVC);

        sp<ABuffer> accessUnit;
        status_t finalResult;
        while (source->hasBufferAvailable(&finalResult)
                && source->dequeueAccessUnit(&accessUnit) == OK) {

            int64_t timeUs;
            CHECK(accessUnit->meta()->findInt64("timeUs", &timeUs));

            if (mStartup) {
@@ -1272,30 +1327,25 @@ status_t PlaylistFetcher::extractAndQueueAccessUnitsFromTs(const sp<ABuffer> &bu
                    }
                }

                if (timeUs < mStartTimeUs) {
                if (timeUs < mStartTimeUs || (isAvc && !mIDRFound)) {
                    // buffer up to the closest preceding IDR frame
                    ALOGV("timeUs %" PRId64 " us < mStartTimeUs %" PRId64 " us",
                            timeUs, mStartTimeUs);
                    const char *mime;
                    sp<MetaData> format  = source->getFormat();
                    bool isAvc = false;
                    if (format != NULL && format->findCString(kKeyMIMEType, &mime)
                            && !strcasecmp(mime, MEDIA_MIMETYPE_VIDEO_AVC)) {
                        isAvc = true;
                    }
                    if (isAvc && IsIDR(accessUnit)) {
                    if (isAvc) {
                        if (IsIDR(accessUnit)) {
                            mVideoBuffer->clear();
                            mIDRFound = true;
                        }
                    if (isAvc) {
                        if (mIDRFound) {
                            mVideoBuffer->queueAccessUnit(accessUnit);
                        }
                    }

                    continue;
                }
            }

            CHECK(accessUnit->meta()->findInt64("timeUs", &timeUs));
            if (mStartTimeUsNotify != NULL && timeUs > mStartTimeUs) {
            if (mStartTimeUsNotify != NULL) {
                int32_t firstSeqNumberInPlaylist;
                if (mPlaylist->meta() == NULL || !mPlaylist->meta()->findInt32(
                            "media-sequence", &firstSeqNumberInPlaylist)) {
@@ -1464,8 +1514,6 @@ status_t PlaylistFetcher::extractAndQueueAccessUnits(
    }

    if (mNextPTSTimeUs >= 0ll) {
        mFirstPTSValid = false;
        mAbsoluteTimeAnchorUs = mNextPTSTimeUs;
        mNextPTSTimeUs = -1ll;
    }

@@ -1566,7 +1614,7 @@ status_t PlaylistFetcher::extractAndQueueAccessUnits(
    CHECK(packetSource->getFormat()->findInt32(kKeySampleRate, &sampleRate));

    int64_t timeUs = (PTS * 100ll) / 9ll;
    if (!mFirstPTSValid) {
    if (mStartup && !mFirstPTSValid) {
        mFirstPTSValid = true;
        mFirstTimeUs = timeUs;
    }
+9 −2
Original line number Diff line number Diff line
@@ -67,7 +67,7 @@ struct PlaylistFetcher : public AHandler {
            int32_t startDiscontinuitySeq = 0,
            bool adaptive = false);

    void pauseAsync();
    void pauseAsync(bool immediate = false);

    void stopAsync(bool clear = true);

@@ -130,13 +130,20 @@ private:
    int32_t mSeqNumber;
    int32_t mNumRetries;
    bool mStartup;
    bool mIDRFound;
    bool mAdaptive;
    bool mPrepared;
    bool mTimeChangeSignaled;
    int64_t mNextPTSTimeUs;

    int32_t mMonitorQueueGeneration;
    const int32_t mSubtitleGeneration;

    int32_t mLastDiscontinuitySeq;

    Mutex mStoppingLock;
    bool mStopping;

    enum RefreshState {
        INITIAL_MINIMUM_RELOAD_DELAY,
        FIRST_UNCHANGED_RELOAD_ATTEMPT,
@@ -152,7 +159,6 @@ private:
    bool mFirstPTSValid;
    uint64_t mFirstPTS;
    int64_t mFirstTimeUs;
    int64_t mAbsoluteTimeAnchorUs;
    sp<AnotherPacketSource> mVideoBuffer;

    // Stores the initialization vector to decrypt the next block of cipher text, which can
@@ -175,6 +181,7 @@ private:

    void postMonitorQueue(int64_t delayUs = 0, int64_t minDelayUs = 0);
    void cancelMonitorQueue();
    void setStopping(bool stopping);

    int64_t delayUsToRefreshPlaylist() const;
    status_t refreshPlaylist();
Loading