Donate to e Foundation | Murena handsets with /e/OS | Own a part of Murena! Learn more

Commit 54000666 authored by Wonsik Kim's avatar Wonsik Kim
Browse files

Implement seek for MPEG2TSExtractor

TODO: Use bandwidth-based estimation to seek forward long period.

Bug: 20126845
Change-Id: I5e2f90784a9ce0dce348715dfcfc4f83ee196170
parent 6267b539
Loading
Loading
Loading
Loading
+13 −0
Original line number Diff line number Diff line
@@ -20,7 +20,9 @@

#include <media/stagefright/foundation/ABase.h>
#include <media/stagefright/MediaExtractor.h>
#include <media/stagefright/MediaSource.h>
#include <utils/threads.h>
#include <utils/KeyedVector.h>
#include <utils/Vector.h>

namespace android {
@@ -54,10 +56,21 @@ private:

    Vector<sp<AnotherPacketSource> > mSourceImpls;

    Vector<KeyedVector<int64_t, off64_t> > mSyncPoints;
    // Sync points used for seeking --- normally one for video track is used.
    // If no video track is present, audio track will be used instead.
    KeyedVector<int64_t, off64_t> *mSeekSyncPoints;

    off64_t mOffset;

    void init();
    status_t feedMore();
    status_t seek(int64_t seekTimeUs,
            const MediaSource::ReadOptions::SeekMode& seekMode);
    status_t queueDiscontinuityForSeek(int64_t actualSeekTimeUs);
    status_t seekBeyond(int64_t seekTimeUs);

    status_t feedUntilBufferAvailable(const sp<AnotherPacketSource> &impl);

    DISALLOW_EVIL_CONSTRUCTORS(MPEG2TSExtractor);
};
+68 −26
Original line number Diff line number Diff line
@@ -54,10 +54,13 @@ struct ATSParser::Program : public RefBase {
    bool parsePSISection(
            unsigned pid, ABitReader *br, status_t *err);

    // Pass to appropriate stream according to pid, and set event if it's a PES
    // with a sync frame.
    // Note that the method itself does not touch event.
    bool parsePID(
            unsigned pid, unsigned continuity_counter,
            unsigned payload_unit_start_indicator,
            ABitReader *br, status_t *err);
            ABitReader *br, status_t *err, SyncEvent *event);

    void signalDiscontinuity(
            DiscontinuityType type, const sp<AMessage> &extra);
@@ -118,10 +121,14 @@ struct ATSParser::Stream : public RefBase {
    unsigned pid() const { return mElementaryPID; }
    void setPID(unsigned pid) { mElementaryPID = pid; }

    // Parse the payload and set event when PES with a sync frame is detected.
    // This method knows when a PES starts; so record mPesStartOffset in that
    // case.
    status_t parse(
            unsigned continuity_counter,
            unsigned payload_unit_start_indicator,
            ABitReader *br);
            ABitReader *br,
            SyncEvent *event);

    void signalDiscontinuity(
            DiscontinuityType type, const sp<AMessage> &extra);
@@ -150,17 +157,24 @@ private:
    bool mEOSReached;

    uint64_t mPrevPTS;
    off64_t mPesStartOffset;

    ElementaryStreamQueue *mQueue;

    status_t flush();
    status_t parsePES(ABitReader *br);

    // Flush accumulated payload if necessary --- i.e. at EOS or at the start of
    // another payload. event is set if the flushed payload is PES with a sync
    // frame.
    status_t flush(SyncEvent *event);
    // Strip and parse PES headers and pass remaining payload into onPayload
    // with parsed metadata. event is set if the PES contains a sync frame.
    status_t parsePES(ABitReader *br, SyncEvent *event);

    // Feed the payload into mQueue and if a packet is identified, queue it
    // into mSource. If the packet is a sync frame. set event with start offset
    // and timestamp of the packet.
    void onPayloadData(
            unsigned PTS_DTS_flags, uint64_t PTS, uint64_t DTS,
            const uint8_t *data, size_t size);

    void extractAACFrames(const sp<ABuffer> &buffer);
            const uint8_t *data, size_t size, SyncEvent *event);

    DISALLOW_EVIL_CONSTRUCTORS(Stream);
};
@@ -190,6 +204,17 @@ private:
    DISALLOW_EVIL_CONSTRUCTORS(PSISection);
};

ATSParser::SyncEvent::SyncEvent(off64_t offset)
    : mInit(false), mOffset(offset), mTimeUs(0) {}

void ATSParser::SyncEvent::init(off64_t offset, const sp<MediaSource> &source,
        int64_t timeUs) {
    mInit = true;
    mOffset = offset;
    mMediaSource = source;
    mTimeUs = timeUs;
}

////////////////////////////////////////////////////////////////////////////////

ATSParser::Program::Program(
@@ -220,7 +245,7 @@ bool ATSParser::Program::parsePSISection(
bool ATSParser::Program::parsePID(
        unsigned pid, unsigned continuity_counter,
        unsigned payload_unit_start_indicator,
        ABitReader *br, status_t *err) {
        ABitReader *br, status_t *err, SyncEvent *event) {
    *err = OK;

    ssize_t index = mStreams.indexOfKey(pid);
@@ -229,7 +254,7 @@ bool ATSParser::Program::parsePID(
    }

    *err = mStreams.editValueAt(index)->parse(
            continuity_counter, payload_unit_start_indicator, br);
            continuity_counter, payload_unit_start_indicator, br, event);

    return true;
}
@@ -628,7 +653,8 @@ ATSParser::Stream::~Stream() {

status_t ATSParser::Stream::parse(
        unsigned continuity_counter,
        unsigned payload_unit_start_indicator, ABitReader *br) {
        unsigned payload_unit_start_indicator, ABitReader *br,
        SyncEvent *event) {
    if (mQueue == NULL) {
        return OK;
    }
@@ -659,12 +685,13 @@ status_t ATSParser::Stream::parse(
    mExpectedContinuityCounter = (continuity_counter + 1) & 0x0f;

    if (payload_unit_start_indicator) {
        off64_t offset = (event != NULL) ? event->getOffset() : 0;
        if (mPayloadStarted) {
            // Otherwise we run the danger of receiving the trailing bytes
            // of a PES packet that we never saw the start of and assuming
            // we have a a complete PES packet.

            status_t err = flush();
            status_t err = flush(event);

            if (err != OK) {
                return err;
@@ -672,6 +699,7 @@ status_t ATSParser::Stream::parse(
        }

        mPayloadStarted = true;
        mPesStartOffset = offset;
    }

    if (!mPayloadStarted) {
@@ -785,10 +813,10 @@ void ATSParser::Stream::signalEOS(status_t finalResult) {
        mSource->signalEOS(finalResult);
    }
    mEOSReached = true;
    flush();
    flush(NULL);
}

status_t ATSParser::Stream::parsePES(ABitReader *br) {
status_t ATSParser::Stream::parsePES(ABitReader *br, SyncEvent *event) {
    unsigned packet_startcode_prefix = br->getBits(24);

    ALOGV("packet_startcode_prefix = 0x%08x", packet_startcode_prefix);
@@ -973,13 +1001,13 @@ status_t ATSParser::Stream::parsePES(ABitReader *br) {
            }

            onPayloadData(
                    PTS_DTS_flags, PTS, DTS, br->data(), dataLength);
                    PTS_DTS_flags, PTS, DTS, br->data(), dataLength, event);

            br->skipBits(dataLength * 8);
        } else {
            onPayloadData(
                    PTS_DTS_flags, PTS, DTS,
                    br->data(), br->numBitsLeft() / 8);
                    br->data(), br->numBitsLeft() / 8, event);

            size_t payloadSizeBits = br->numBitsLeft();
            if (payloadSizeBits % 8 != 0u) {
@@ -1003,7 +1031,7 @@ status_t ATSParser::Stream::parsePES(ABitReader *br) {
    return OK;
}

status_t ATSParser::Stream::flush() {
status_t ATSParser::Stream::flush(SyncEvent *event) {
    if (mBuffer->size() == 0) {
        return OK;
    }
@@ -1012,7 +1040,7 @@ status_t ATSParser::Stream::flush() {

    ABitReader br(mBuffer->data(), mBuffer->size());

    status_t err = parsePES(&br);
    status_t err = parsePES(&br, event);

    mBuffer->setRange(0, 0);

@@ -1021,7 +1049,7 @@ status_t ATSParser::Stream::flush() {

void ATSParser::Stream::onPayloadData(
        unsigned PTS_DTS_flags, uint64_t PTS, uint64_t /* DTS */,
        const uint8_t *data, size_t size) {
        const uint8_t *data, size_t size, SyncEvent *event) {
#if 0
    ALOGI("payload streamType 0x%02x, PTS = 0x%016llx, dPTS = %lld",
          mStreamType,
@@ -1048,6 +1076,7 @@ void ATSParser::Stream::onPayloadData(
    }

    sp<ABuffer> accessUnit;
    bool found = false;
    while ((accessUnit = mQueue->dequeueAccessUnit()) != NULL) {
        if (mSource == NULL) {
            sp<MetaData> meta = mQueue->getFormat();
@@ -1075,6 +1104,17 @@ void ATSParser::Stream::onPayloadData(
            }
            mSource->queueAccessUnit(accessUnit);
        }

        if ((event != NULL) && !found && mQueue->getFormat() != NULL) {
            int32_t sync = 0;
            if (accessUnit->meta()->findInt32("isSync", &sync) && sync) {
                int64_t timeUs;
                if (accessUnit->meta()->findInt64("timeUs", &timeUs)) {
                    found = true;
                    event->init(mPesStartOffset, mSource, timeUs);
                }
            }
        }
    }
}

@@ -1127,14 +1167,15 @@ ATSParser::ATSParser(uint32_t flags)
ATSParser::~ATSParser() {
}

status_t ATSParser::feedTSPacket(const void *data, size_t size) {
status_t ATSParser::feedTSPacket(const void *data, size_t size,
        SyncEvent *event) {
    if (size != kTSPacketSize) {
        ALOGE("Wrong TS packet size");
        return BAD_VALUE;
    }

    ABitReader br((const uint8_t *)data, kTSPacketSize);
    return parseTS(&br);
    return parseTS(&br, event);
}

void ATSParser::signalDiscontinuity(
@@ -1262,7 +1303,8 @@ void ATSParser::parseProgramAssociationTable(ABitReader *br) {
status_t ATSParser::parsePID(
        ABitReader *br, unsigned PID,
        unsigned continuity_counter,
        unsigned payload_unit_start_indicator) {
        unsigned payload_unit_start_indicator,
        SyncEvent *event) {
    ssize_t sectionIndex = mPSISections.indexOfKey(PID);

    if (sectionIndex >= 0) {
@@ -1334,7 +1376,7 @@ status_t ATSParser::parsePID(
        status_t err;
        if (mPrograms.editItemAt(i)->parsePID(
                    PID, continuity_counter, payload_unit_start_indicator,
                    br, &err)) {
                    br, &err, event)) {
            if (err != OK) {
                return err;
            }
@@ -1405,7 +1447,7 @@ status_t ATSParser::parseAdaptationField(ABitReader *br, unsigned PID) {
    return OK;
}

status_t ATSParser::parseTS(ABitReader *br) {
status_t ATSParser::parseTS(ABitReader *br, SyncEvent *event) {
    ALOGV("---");

    unsigned sync_byte = br->getBits(8);
@@ -1444,8 +1486,8 @@ status_t ATSParser::parseTS(ABitReader *br) {
    }
    if (err == OK) {
        if (adaptation_field_control == 1 || adaptation_field_control == 3) {
            err = parsePID(
                    br, PID, continuity_counter, payload_unit_start_indicator);
            err = parsePID(br, PID, continuity_counter,
                    payload_unit_start_indicator, event);
        }
    }

+50 −6
Original line number Diff line number Diff line
@@ -22,6 +22,7 @@

#include <media/stagefright/foundation/ABase.h>
#include <media/stagefright/foundation/AMessage.h>
#include <media/stagefright/MediaSource.h>
#include <utils/KeyedVector.h>
#include <utils/Vector.h>
#include <utils/RefBase.h>
@@ -30,7 +31,6 @@ namespace android {

class ABitReader;
struct ABuffer;
struct MediaSource;

struct ATSParser : public RefBase {
    enum DiscontinuityType {
@@ -62,9 +62,43 @@ struct ATSParser : public RefBase {
        ALIGNED_VIDEO_DATA         = 2,
    };

    // Event is used to signal sync point event at feedTSPacket().
    struct SyncEvent {
        SyncEvent(off64_t offset);

        void init(off64_t offset, const sp<MediaSource> &source,
                int64_t timeUs);

        bool isInit() { return mInit; }
        off64_t getOffset() { return mOffset; }
        const sp<MediaSource> &getMediaSource() { return mMediaSource; }
        int64_t getTimeUs() { return mTimeUs; }

    private:
        bool mInit;
        /*
         * mInit == false: the current offset
         * mInit == true: the start offset of sync payload
         */
        off64_t mOffset;
        /* The media source object for this event. */
        sp<MediaSource> mMediaSource;
        /* The timestamp of the sync frame. */
        int64_t mTimeUs;
    };

    ATSParser(uint32_t flags = 0);

    status_t feedTSPacket(const void *data, size_t size);
    // Feed a TS packet into the parser. uninitialized event with the start
    // offset of this TS packet goes in, and if the parser detects PES with
    // a sync frame, the event will be initiailzed with the start offset of the
    // PES. Note that the offset of the event can be different from what we fed,
    // as a PES may consist of multiple TS packets.
    //
    // Even in the case feedTSPacket() returns non-OK value, event still may be
    // initialized if the parsing failed after the detection.
    status_t feedTSPacket(
            const void *data, size_t size, SyncEvent *event = NULL);

    void signalDiscontinuity(
            DiscontinuityType type, const sp<AMessage> &extra);
@@ -126,15 +160,25 @@ private:

    void parseProgramAssociationTable(ABitReader *br);
    void parseProgramMap(ABitReader *br);
    void parsePES(ABitReader *br);

    // Parse PES packet where br is pointing to. If the PES contains a sync
    // frame, set event with the time and the start offset of this PES.
    // Note that the method itself does not touch event.
    void parsePES(ABitReader *br, SyncEvent *event);

    // Strip remaining packet headers and pass to appropriate program/stream
    // to parse the payload. If the payload turns out to be PES and contains
    // a sync frame, event shall be set with the time and start offset of the
    // PES.
    // Note that the method itself does not touch event.
    status_t parsePID(
        ABitReader *br, unsigned PID,
        unsigned continuity_counter,
        unsigned payload_unit_start_indicator);
        unsigned payload_unit_start_indicator,
        SyncEvent *event);

    status_t parseAdaptationField(ABitReader *br, unsigned PID);
    status_t parseTS(ABitReader *br);
    // see feedTSPacket().
    status_t parseTS(ABitReader *br, SyncEvent *event);

    void updatePCR(unsigned PID, uint64_t PCR, size_t byteOffsetFromStart);

+259 −41
Original line number Diff line number Diff line
@@ -21,12 +21,14 @@
#include "include/MPEG2TSExtractor.h"
#include "include/NuCachedSource2.h"

#include <media/stagefright/foundation/ABuffer.h>
#include <media/stagefright/foundation/ADebug.h>
#include <media/stagefright/DataSource.h>
#include <media/stagefright/MediaDefs.h>
#include <media/stagefright/MediaErrors.h>
#include <media/stagefright/MediaSource.h>
#include <media/stagefright/MetaData.h>
#include <media/IStreamSource.h>
#include <utils/String8.h>

#include "AnotherPacketSource.h"
@@ -40,7 +42,7 @@ struct MPEG2TSSource : public MediaSource {
    MPEG2TSSource(
            const sp<MPEG2TSExtractor> &extractor,
            const sp<AnotherPacketSource> &impl,
            bool seekable);
            bool doesSeek);

    virtual status_t start(MetaData *params = NULL);
    virtual status_t stop();
@@ -54,8 +56,8 @@ private:
    sp<AnotherPacketSource> mImpl;

    // If there are both audio and video streams, only the video stream
    // will be seekable, otherwise the single stream will be seekable.
    bool mSeekable;
    // will signal seek on the extractor; otherwise the single stream will seek.
    bool mDoesSeek;

    DISALLOW_EVIL_CONSTRUCTORS(MPEG2TSSource);
};
@@ -63,10 +65,10 @@ private:
MPEG2TSSource::MPEG2TSSource(
        const sp<MPEG2TSExtractor> &extractor,
        const sp<AnotherPacketSource> &impl,
        bool seekable)
        bool doesSeek)
    : mExtractor(extractor),
      mImpl(impl),
      mSeekable(seekable) {
      mDoesSeek(doesSeek) {
}

status_t MPEG2TSSource::start(MetaData *params) {
@@ -85,27 +87,18 @@ status_t MPEG2TSSource::read(
        MediaBuffer **out, const ReadOptions *options) {
    *out = NULL;

    status_t finalResult;
    while (!mImpl->hasBufferAvailable(&finalResult)) {
        if (finalResult != OK) {
            return ERROR_END_OF_STREAM;
        }

        status_t err = mExtractor->feedMore();
    int64_t seekTimeUs;
    ReadOptions::SeekMode seekMode;
    if (mDoesSeek && options && options->getSeekTo(&seekTimeUs, &seekMode)) {
        // seek is needed
        status_t err = mExtractor->seek(seekTimeUs, seekMode);
        if (err != OK) {
            mImpl->signalEOS(err);
            return err;
        }
    }

    int64_t seekTimeUs;
    ReadOptions::SeekMode seekMode;
    if (mSeekable && options && options->getSeekTo(&seekTimeUs, &seekMode)) {
        // A seek was requested, but we don't actually support seeking and so can only "seek" to
        // the current position
        int64_t nextBufTimeUs;
        if (mImpl->nextBufferTime(&nextBufTimeUs) != OK || seekTimeUs != nextBufTimeUs) {
            return ERROR_UNSUPPORTED;
        }
    if (mExtractor->feedUntilBufferAvailable(mImpl) != OK) {
        return ERROR_END_OF_STREAM;
    }

    return mImpl->read(out, options);
@@ -129,23 +122,10 @@ sp<MediaSource> MPEG2TSExtractor::getTrack(size_t index) {
        return NULL;
    }

    bool seekable = true;
    if (mSourceImpls.size() > 1) {
        if (mSourceImpls.size() != 2u) {
            ALOGE("Wrong size");
            return NULL;
        }

        sp<MetaData> meta = mSourceImpls.editItemAt(index)->getFormat();
        const char *mime;
        CHECK(meta->findCString(kKeyMIMEType, &mime));

        if (!strncasecmp("audio/", mime, 6)) {
            seekable = false;
        }
    }

    return new MPEG2TSSource(this, mSourceImpls.editItemAt(index), seekable);
    // The seek reference track (video if present; audio otherwise) performs
    // seek requests, while other tracks ignore requests.
    return new MPEG2TSSource(this, mSourceImpls.editItemAt(index),
            (mSeekSyncPoints == &mSyncPoints.editItemAt(index)));
}

sp<MetaData> MPEG2TSExtractor::getTrackMetaData(
@@ -178,6 +158,8 @@ void MPEG2TSExtractor::init() {
            if (impl != NULL) {
                haveVideo = true;
                mSourceImpls.push(impl);
                mSyncPoints.push();
                mSeekSyncPoints = &mSyncPoints.editTop();
            }
        }

@@ -189,6 +171,10 @@ void MPEG2TSExtractor::init() {
            if (impl != NULL) {
                haveAudio = true;
                mSourceImpls.push(impl);
                mSyncPoints.push();
                if (!haveVideo) {
                    mSeekSyncPoints = &mSyncPoints.editTop();
                }
            }
        }

@@ -197,6 +183,55 @@ void MPEG2TSExtractor::init() {
        }
    }

    off64_t size;
    if (mDataSource->getSize(&size) == OK && (haveAudio || haveVideo)) {
        sp<AnotherPacketSource> impl = haveVideo
                ? (AnotherPacketSource *)mParser->getSource(
                        ATSParser::VIDEO).get()
                : (AnotherPacketSource *)mParser->getSource(
                        ATSParser::AUDIO).get();
        int64_t prevBufferedDurationUs = 0;
        int64_t durationUs = -1;
        List<int64_t> durations;
        // Estimate duration --- stabilize until you get <500ms deviation.
        for (; feedMore() == OK && numPacketsParsed <= 10000;
                ++numPacketsParsed) {
            status_t err;
            int64_t bufferedDurationUs = impl->getBufferedDurationUs(&err);
            if (err != OK) {
                break;
            }
            if (bufferedDurationUs != prevBufferedDurationUs) {
                durationUs = size * bufferedDurationUs / mOffset;
                durations.push_back(durationUs);
                if (durations.size() > 5) {
                    durations.erase(durations.begin());
                    int64_t min = *durations.begin();
                    int64_t max = *durations.begin();
                    for (List<int64_t>::iterator i = durations.begin();
                            i != durations.end();
                            ++i) {
                        if (min > *i) {
                            min = *i;
                        }
                        if (max < *i) {
                            max = *i;
                        }
                    }
                    if (max - min < 500 * 1000) {
                        break;
                    }
                }
                prevBufferedDurationUs = bufferedDurationUs;
            }
        }
        if (durationUs > 0) {
            const sp<MetaData> meta = impl->getFormat();
            meta->setInt64(kKeyDuration, durationUs);
            impl->setFormat(meta);
        }
    }

    ALOGI("haveAudio=%d, haveVideo=%d", haveAudio, haveVideo);
}

@@ -213,12 +248,195 @@ status_t MPEG2TSExtractor::feedMore() {
        return (n < 0) ? (status_t)n : ERROR_END_OF_STREAM;
    }

    ATSParser::SyncEvent event(mOffset);
    mOffset += n;
    return mParser->feedTSPacket(packet, kTSPacketSize);
    status_t err = mParser->feedTSPacket(packet, kTSPacketSize, &event);
    if (event.isInit()) {
        for (size_t i = 0; i < mSourceImpls.size(); ++i) {
            if (mSourceImpls[i].get() == event.getMediaSource().get()) {
                mSyncPoints.editItemAt(i).add(
                        event.getTimeUs(), event.getOffset());
                break;
            }
        }
    }
    return err;
}

uint32_t MPEG2TSExtractor::flags() const {
    return CAN_PAUSE;
    return CAN_PAUSE | CAN_SEEK_BACKWARD | CAN_SEEK_FORWARD;
}

status_t MPEG2TSExtractor::seek(int64_t seekTimeUs,
        const MediaSource::ReadOptions::SeekMode &seekMode) {
    if (mSeekSyncPoints == NULL || mSeekSyncPoints->isEmpty()) {
        ALOGW("No sync point to seek to.");
        // ... and therefore we have nothing useful to do here.
        return OK;
    }

    // Determine whether we're seeking beyond the known area.
    bool shouldSeekBeyond =
            (seekTimeUs > mSeekSyncPoints->keyAt(mSeekSyncPoints->size() - 1));

    // Determine the sync point to seek.
    size_t index = 0;
    for (; index < mSeekSyncPoints->size(); ++index) {
        int64_t timeUs = mSeekSyncPoints->keyAt(index);
        if (timeUs > seekTimeUs) {
            break;
        }
    }

    switch (seekMode) {
        case MediaSource::ReadOptions::SEEK_NEXT_SYNC:
            if (index == mSeekSyncPoints->size()) {
                ALOGW("Next sync not found; starting from the latest sync.");
                --index;
            }
            break;
        case MediaSource::ReadOptions::SEEK_CLOSEST_SYNC:
        case MediaSource::ReadOptions::SEEK_CLOSEST:
            ALOGW("seekMode not supported: %d; falling back to PREVIOUS_SYNC",
                    seekMode);
            // fall-through
        case MediaSource::ReadOptions::SEEK_PREVIOUS_SYNC:
            if (index == 0) {
                ALOGW("Previous sync not found; starting from the earliest "
                        "sync.");
            } else {
                --index;
            }
            break;
    }
    if (!shouldSeekBeyond || mOffset <= mSeekSyncPoints->valueAt(index)) {
        int64_t actualSeekTimeUs = mSeekSyncPoints->keyAt(index);
        mOffset = mSeekSyncPoints->valueAt(index);
        status_t err = queueDiscontinuityForSeek(actualSeekTimeUs);
        if (err != OK) {
            return err;
        }
    }

    if (shouldSeekBeyond) {
        status_t err = seekBeyond(seekTimeUs);
        if (err != OK) {
            return err;
        }
    }

    // Fast-forward to sync frame.
    for (size_t i = 0; i < mSourceImpls.size(); ++i) {
        const sp<AnotherPacketSource> &impl = mSourceImpls[i];
        status_t err;
        feedUntilBufferAvailable(impl);
        while (impl->hasBufferAvailable(&err)) {
            sp<AMessage> meta = impl->getMetaAfterLastDequeued(0);
            sp<ABuffer> buffer;
            if (meta == NULL) {
                return UNKNOWN_ERROR;
            }
            int32_t sync;
            if (meta->findInt32("isSync", &sync) && sync) {
                break;
            }
            err = impl->dequeueAccessUnit(&buffer);
            if (err != OK) {
                return err;
            }
            feedUntilBufferAvailable(impl);
        }
    }

    return OK;
}

status_t MPEG2TSExtractor::queueDiscontinuityForSeek(int64_t actualSeekTimeUs) {
    // Signal discontinuity
    sp<AMessage> extra(new AMessage);
    extra->setInt64(IStreamListener::kKeyMediaTimeUs, actualSeekTimeUs);
    mParser->signalDiscontinuity(ATSParser::DISCONTINUITY_TIME, extra);

    // After discontinuity, impl should only have discontinuities
    // with the last being what we queued. Dequeue them all here.
    for (size_t i = 0; i < mSourceImpls.size(); ++i) {
        const sp<AnotherPacketSource> &impl = mSourceImpls.itemAt(i);
        sp<ABuffer> buffer;
        status_t err;
        while (impl->hasBufferAvailable(&err)) {
            if (err != OK) {
                return err;
            }
            err = impl->dequeueAccessUnit(&buffer);
            // If the source contains anything but discontinuity, that's
            // a programming mistake.
            CHECK(err == INFO_DISCONTINUITY);
        }
    }

    // Feed until we have a buffer for each source.
    for (size_t i = 0; i < mSourceImpls.size(); ++i) {
        const sp<AnotherPacketSource> &impl = mSourceImpls.itemAt(i);
        sp<ABuffer> buffer;
        status_t err = feedUntilBufferAvailable(impl);
        if (err != OK) {
            return err;
        }
    }

    return OK;
}

status_t MPEG2TSExtractor::seekBeyond(int64_t seekTimeUs) {
    // If we're seeking beyond where we know --- read until we reach there.
    size_t syncPointsSize = mSeekSyncPoints->size();

    while (seekTimeUs > mSeekSyncPoints->keyAt(
            mSeekSyncPoints->size() - 1)) {
        status_t err;
        if (syncPointsSize < mSeekSyncPoints->size()) {
            syncPointsSize = mSeekSyncPoints->size();
            int64_t syncTimeUs = mSeekSyncPoints->keyAt(syncPointsSize - 1);
            // Dequeue buffers before sync point in order to avoid too much
            // cache building up.
            sp<ABuffer> buffer;
            for (size_t i = 0; i < mSourceImpls.size(); ++i) {
                const sp<AnotherPacketSource> &impl = mSourceImpls[i];
                int64_t timeUs;
                while ((err = impl->nextBufferTime(&timeUs)) == OK) {
                    if (timeUs < syncTimeUs) {
                        impl->dequeueAccessUnit(&buffer);
                    } else {
                        break;
                    }
                }
                if (err != OK && err != -EWOULDBLOCK) {
                    return err;
                }
            }
        }
        if (feedMore() != OK) {
            return ERROR_END_OF_STREAM;
        }
    }

    return OK;
}

status_t MPEG2TSExtractor::feedUntilBufferAvailable(
        const sp<AnotherPacketSource> &impl) {
    status_t finalResult;
    while (!impl->hasBufferAvailable(&finalResult)) {
        if (finalResult != OK) {
            return finalResult;
        }

        status_t err = feedMore();
        if (err != OK) {
            impl->signalEOS(err);
        }
    }
    return OK;
}

////////////////////////////////////////////////////////////////////////////////