Donate to e Foundation | Murena handsets with /e/OS | Own a part of Murena! Learn more

Commit 0dd229bb authored by Robert Shih's avatar Robert Shih
Browse files

AnotherPacketSource: make getBufferedDurationUs more discontinuity-aware

The new getBufferedDurationUs implementation obsoletes the purpose of
getEstimatedDurationUs; remove getEstimatedDurationUs and its
associated member variables. Finally replace calls to
getEstimatedDurationUs with getBufferedDurationUs.

Change-Id: I38f20df8e177ffbfe299b203d99076fc98dcd274
parent c9c3804a
Loading
Loading
Loading
Loading
+2 −1
Original line number Diff line number Diff line
@@ -2262,8 +2262,9 @@ bool LiveSession::checkBuffering(
            continue;
        }

        status_t finalResult;
        int64_t bufferedDurationUs =
                mPacketSources[i]->getEstimatedDurationUs();
                mPacketSources[i]->getBufferedDurationUs(&finalResult);
        ALOGV("[%s] buffered %lld us",
                getNameForStream(mPacketSources.keyAt(i)),
                (long long)bufferedDurationUs);
+81 −99
Original line number Diff line number Diff line
@@ -46,9 +46,10 @@ AnotherPacketSource::AnotherPacketSource(const sp<MetaData> &meta)
      mLastQueuedTimeUs(0),
      mEOSResult(OK),
      mLatestEnqueuedMeta(NULL),
      mLatestDequeuedMeta(NULL),
      mQueuedDiscontinuityCount(0) {
      mLatestDequeuedMeta(NULL) {
    setFormat(meta);

    mDiscontinuitySegments.push_back(DiscontinuitySegment());
}

void AnotherPacketSource::setFormat(const sp<MetaData> &meta) {
@@ -129,11 +130,20 @@ status_t AnotherPacketSource::dequeueAccessUnit(sp<ABuffer> *buffer) {
                mFormat.clear();
            }

            --mQueuedDiscontinuityCount;
            mDiscontinuitySegments.erase(mDiscontinuitySegments.begin());
            // CHECK(!mDiscontinuitySegments.empty());
            return INFO_DISCONTINUITY;
        }

        // CHECK(!mDiscontinuitySegments.empty());
        DiscontinuitySegment &seg = *mDiscontinuitySegments.begin();

        int64_t timeUs;
        mLatestDequeuedMeta = (*buffer)->meta()->dup();
        CHECK(mLatestDequeuedMeta->findInt64("timeUs", &timeUs));
        if (timeUs > seg.mMaxDequeTimeUs) {
            seg.mMaxDequeTimeUs = timeUs;
        }

        sp<RefBase> object;
        if ((*buffer)->meta()->findObject("format", &object)) {
@@ -172,6 +182,8 @@ status_t AnotherPacketSource::read(
                mFormat.clear();
            }

            mDiscontinuitySegments.erase(mDiscontinuitySegments.begin());
            // CHECK(!mDiscontinuitySegments.empty());
            return INFO_DISCONTINUITY;
        }

@@ -184,6 +196,11 @@ status_t AnotherPacketSource::read(

        int64_t timeUs;
        CHECK(buffer->meta()->findInt64("timeUs", &timeUs));
        // CHECK(!mDiscontinuitySegments.empty());
        DiscontinuitySegment &seg = *mDiscontinuitySegments.begin();
        if (timeUs > seg.mMaxDequeTimeUs) {
            seg.mMaxDequeTimeUs = timeUs;
        }

        MediaBuffer *mediaBuffer = new MediaBuffer(buffer);

@@ -227,11 +244,13 @@ void AnotherPacketSource::queueAccessUnit(const sp<ABuffer> &buffer) {

    int32_t discontinuity;
    if (buffer->meta()->findInt32("discontinuity", &discontinuity)){
        // discontinuity handling needs to be consistent with queueDiscontinuity()
        ++mQueuedDiscontinuityCount;
        ALOGV("queueing a discontinuity with queueAccessUnit");

        mLastQueuedTimeUs = 0ll;
        mEOSResult = OK;
        mLatestEnqueuedMeta = NULL;

        mDiscontinuitySegments.push_back(DiscontinuitySegment());
        return;
    }

@@ -241,6 +260,15 @@ void AnotherPacketSource::queueAccessUnit(const sp<ABuffer> &buffer) {
    ALOGV("queueAccessUnit timeUs=%" PRIi64 " us (%.2f secs)",
            mLastQueuedTimeUs, mLastQueuedTimeUs / 1E6);

    // CHECK(!mDiscontinuitySegments.empty());
    DiscontinuitySegment &tailSeg = *(--mDiscontinuitySegments.end());
    if (lastQueuedTimeUs > tailSeg.mMaxEnqueTimeUs) {
        tailSeg.mMaxEnqueTimeUs = lastQueuedTimeUs;
    }
    if (tailSeg.mMaxDequeTimeUs == -1) {
        tailSeg.mMaxDequeTimeUs = lastQueuedTimeUs;
    }

    if (mLatestEnqueuedMeta == NULL) {
        mLatestEnqueuedMeta = buffer->meta()->dup();
    } else {
@@ -264,7 +292,9 @@ void AnotherPacketSource::clear() {

    mBuffers.clear();
    mEOSResult = OK;
    mQueuedDiscontinuityCount = 0;

    mDiscontinuitySegments.clear();
    mDiscontinuitySegments.push_back(DiscontinuitySegment());

    mFormat = NULL;
    mLatestEnqueuedMeta = NULL;
@@ -291,6 +321,14 @@ void AnotherPacketSource::queueDiscontinuity(

            ++it;
        }

        for (List<DiscontinuitySegment>::iterator it2 = mDiscontinuitySegments.begin();
                it2 != mDiscontinuitySegments.end();
                ++it2) {
            DiscontinuitySegment &seg = *it2;
            seg.clear();
        }

    }

    mEOSResult = OK;
@@ -301,7 +339,8 @@ void AnotherPacketSource::queueDiscontinuity(
        return;
    }

    ++mQueuedDiscontinuityCount;
    mDiscontinuitySegments.push_back(DiscontinuitySegment());

    sp<ABuffer> buffer = new ABuffer(0);
    buffer->meta()->setInt32("discontinuity", static_cast<int32_t>(type));
    buffer->meta()->setMessage("extra", extra);
@@ -352,95 +391,19 @@ bool AnotherPacketSource::hasDataBufferAvailable(status_t *finalResult) {

int64_t AnotherPacketSource::getBufferedDurationUs(status_t *finalResult) {
    Mutex::Autolock autoLock(mLock);
    return getBufferedDurationUs_l(finalResult);
}

int64_t AnotherPacketSource::getBufferedDurationUs_l(status_t *finalResult) {
    *finalResult = mEOSResult;

    if (mBuffers.empty()) {
        return 0;
    }

    int64_t time1 = -1;
    int64_t time2 = -1;
    int64_t durationUs = 0;

    List<sp<ABuffer> >::iterator it;
    for (it = mBuffers.begin(); it != mBuffers.end(); it++) {
        const sp<ABuffer> &buffer = *it;

        int32_t discard;
        if (buffer->meta()->findInt32("discard", &discard) && discard) {
            continue;
        }

        int64_t timeUs;
        if (buffer->meta()->findInt64("timeUs", &timeUs)) {
            if (time1 < 0 || timeUs < time1) {
                time1 = timeUs;
            }

            if (time2 < 0 || timeUs > time2) {
                time2 = timeUs;
            }
        } else {
            // This is a discontinuity, reset everything.
            durationUs += time2 - time1;
            time1 = time2 = -1;
        }
    }

    return durationUs + (time2 - time1);
}

// A cheaper but less precise version of getBufferedDurationUs that we would like to use in
// LiveSession::dequeueAccessUnit to trigger downwards adaptation.
int64_t AnotherPacketSource::getEstimatedDurationUs() {
    Mutex::Autolock autoLock(mLock);
    if (mBuffers.empty()) {
        return 0;
    }

    if (mQueuedDiscontinuityCount > 0) {
        status_t finalResult;
        return getBufferedDurationUs_l(&finalResult);
    }

    sp<ABuffer> buffer;
    int32_t discard;
    int64_t startTimeUs = -1ll;
    List<sp<ABuffer> >::iterator it;
    for (it = mBuffers.begin(); it != mBuffers.end(); it++) {
        buffer = *it;
        if (buffer->meta()->findInt32("discard", &discard) && discard) {
            continue;
        }
        buffer->meta()->findInt64("timeUs", &startTimeUs);
        break;
    for (List<DiscontinuitySegment>::iterator it = mDiscontinuitySegments.begin();
            it != mDiscontinuitySegments.end();
            ++it) {
        const DiscontinuitySegment &seg = *it;
        // dequeued access units should be a subset of enqueued access units
        // CHECK(seg.maxEnqueTimeUs >= seg.mMaxDequeTimeUs);
        durationUs += (seg.mMaxEnqueTimeUs - seg.mMaxDequeTimeUs);
    }

    if (startTimeUs < 0) {
        return 0;
    }

    it = mBuffers.end();
    --it;
    buffer = *it;

    int64_t endTimeUs;
    buffer->meta()->findInt64("timeUs", &endTimeUs);
    if (endTimeUs < 0) {
        return 0;
    }

    int64_t diffUs;
    if (endTimeUs > startTimeUs) {
        diffUs = endTimeUs - startTimeUs;
    } else {
        diffUs = startTimeUs - endTimeUs;
    }
    return diffUs;
    return durationUs;
}

status_t AnotherPacketSource::nextBufferTime(int64_t *timeUs) {
@@ -540,14 +503,15 @@ void AnotherPacketSource::trimBuffersAfterMeta(
            stopTime.mSeq, (long long)stopTime.mTimeUs);

    List<sp<ABuffer> >::iterator it;
    List<DiscontinuitySegment >::iterator it2;
    sp<AMessage> newLatestEnqueuedMeta = NULL;
    int64_t newLastQueuedTimeUs = 0;
    size_t newDiscontinuityCount = 0;
    for (it = mBuffers.begin(); it != mBuffers.end(); ++it) {
    for (it = mBuffers.begin(), it2 = mDiscontinuitySegments.begin(); it != mBuffers.end(); ++it) {
        const sp<ABuffer> &buffer = *it;
        int32_t discontinuity;
        if (buffer->meta()->findInt32("discontinuity", &discontinuity)) {
            newDiscontinuityCount++;
            // CHECK(it2 != mDiscontinuitySegments.end());
            ++it2;
            continue;
        }

@@ -560,10 +524,18 @@ void AnotherPacketSource::trimBuffersAfterMeta(
        newLatestEnqueuedMeta = buffer->meta();
        newLastQueuedTimeUs = curTime.mTimeUs;
    }

    mBuffers.erase(it, mBuffers.end());
    mLatestEnqueuedMeta = newLatestEnqueuedMeta;
    mLastQueuedTimeUs = newLastQueuedTimeUs;
    mQueuedDiscontinuityCount = newDiscontinuityCount;

    DiscontinuitySegment &seg = *it2;
    if (newLatestEnqueuedMeta != NULL) {
        seg.mMaxEnqueTimeUs = newLastQueuedTimeUs;
    } else {
        seg.clear();
    }
    mDiscontinuitySegments.erase(++it2, mDiscontinuitySegments.end());
}

/*
@@ -580,6 +552,7 @@ sp<AMessage> AnotherPacketSource::trimBuffersBeforeMeta(
            startTime.mSeq, (long long)startTime.mTimeUs);

    sp<AMessage> firstMeta;
    int64_t firstTimeUs = -1;
    Mutex::Autolock autoLock(mLock);
    if (mBuffers.empty()) {
        return NULL;
@@ -589,14 +562,14 @@ sp<AMessage> AnotherPacketSource::trimBuffersBeforeMeta(
    bool isAvc = false;

    List<sp<ABuffer> >::iterator it;
    size_t discontinuityCount = 0;
    for (it = mBuffers.begin(); it != mBuffers.end(); ++it) {
        const sp<ABuffer> &buffer = *it;
        int32_t discontinuity;
        if (buffer->meta()->findInt32("discontinuity", &discontinuity)) {
            mDiscontinuitySegments.erase(mDiscontinuitySegments.begin());
            // CHECK(!mDiscontinuitySegments.empty());
            format = NULL;
            isAvc = false;
            discontinuityCount++;
            continue;
        }
        if (format == NULL) {
@@ -618,12 +591,21 @@ sp<AMessage> AnotherPacketSource::trimBuffersBeforeMeta(
            ALOGV("trimming from beginning to %lld (not inclusive)",
                    (long long)curTime.mTimeUs);
            firstMeta = buffer->meta();
            firstTimeUs = curTime.mTimeUs;
            break;
        }
    }
    mBuffers.erase(mBuffers.begin(), it);
    mQueuedDiscontinuityCount -= discontinuityCount;
    mLatestDequeuedMeta = NULL;

    // CHECK(!mDiscontinuitySegments.empty());
    DiscontinuitySegment &seg = *mDiscontinuitySegments.begin();
    if (firstTimeUs >= 0) {
        seg.mMaxDequeTimeUs = firstTimeUs;
    } else {
        seg.clear();
    }

    return firstMeta;
}

+19 −5
Original line number Diff line number Diff line
@@ -53,8 +53,6 @@ struct AnotherPacketSource : public MediaSource {
    // presentation timestamps since the last discontinuity (if any).
    int64_t getBufferedDurationUs(status_t *finalResult);

    int64_t getEstimatedDurationUs();

    status_t nextBufferTime(int64_t *timeUs);

    void queueAccessUnit(const sp<ABuffer> &buffer);
@@ -84,6 +82,25 @@ protected:
    virtual ~AnotherPacketSource();

private:

    struct DiscontinuitySegment {
        int64_t mMaxDequeTimeUs, mMaxEnqueTimeUs;
        DiscontinuitySegment()
            : mMaxDequeTimeUs(-1),
              mMaxEnqueTimeUs(-1) {
        };

        void clear() {
            mMaxDequeTimeUs = mMaxEnqueTimeUs = -1;
        }
    };

    // Discontinuity segments are consecutive access units between
    // discontinuity markers. There should always be at least _ONE_
    // discontinuity segment, hence the various CHECKs in
    // AnotherPacketSource.cpp for non-empty()-ness.
    List<DiscontinuitySegment> mDiscontinuitySegments;

    Mutex mLock;
    Condition mCondition;

@@ -97,10 +114,7 @@ private:
    sp<AMessage> mLatestEnqueuedMeta;
    sp<AMessage> mLatestDequeuedMeta;

    size_t  mQueuedDiscontinuityCount;

    bool wasFormatChange(int32_t discontinuityType) const;
    int64_t getBufferedDurationUs_l(status_t *finalResult);

    DISALLOW_EVIL_CONSTRUCTORS(AnotherPacketSource);
};