Donate to e Foundation | Murena handsets with /e/OS | Own a part of Murena! Learn more

Commit 84b784dd authored by Chong Zhang's avatar Chong Zhang Committed by Android (Google) Code Review
Browse files

Merge "HLS: bandwidth estimator changes"

parents 446ef202 538b6d22
Loading
Loading
Loading
Loading
+109 −38
Original line number Original line Diff line number Diff line
@@ -50,13 +50,75 @@
namespace android {
namespace android {


// static
// static
// Number of recently-read bytes to use for bandwidth estimation
const size_t LiveSession::kBandwidthHistoryBytes = 200 * 1024;
// High water mark to start up switch or report prepared)
// High water mark to start up switch or report prepared)
const int64_t LiveSession::kHighWaterMark = 8000000ll;
const int64_t LiveSession::kHighWaterMark = 8000000ll;
const int64_t LiveSession::kMidWaterMark = 5000000ll;
const int64_t LiveSession::kMidWaterMark = 5000000ll;
const int64_t LiveSession::kLowWaterMark = 3000000ll;
const int64_t LiveSession::kLowWaterMark = 3000000ll;


struct LiveSession::BandwidthEstimator : public RefBase {
    BandwidthEstimator();

    void addBandwidthMeasurement(size_t numBytes, int64_t delayUs);
    bool estimateBandwidth(int32_t *bandwidth);

private:
    // Bandwidth estimation parameters
    static const int32_t kMaxBandwidthHistoryItems = 20;
    static const int64_t kMaxBandwidthHistoryWindowUs = 3000000ll; // 3 sec

    struct BandwidthEntry {
        int64_t mDelayUs;
        size_t mNumBytes;
    };

    Mutex mLock;
    List<BandwidthEntry> mBandwidthHistory;
    int64_t mTotalTransferTimeUs;
    size_t mTotalTransferBytes;

    DISALLOW_EVIL_CONSTRUCTORS(BandwidthEstimator);
};

LiveSession::BandwidthEstimator::BandwidthEstimator() :
    mTotalTransferTimeUs(0),
    mTotalTransferBytes(0) {
}

void LiveSession::BandwidthEstimator::addBandwidthMeasurement(
        size_t numBytes, int64_t delayUs) {
    AutoMutex autoLock(mLock);

    BandwidthEntry entry;
    entry.mDelayUs = delayUs;
    entry.mNumBytes = numBytes;
    mTotalTransferTimeUs += delayUs;
    mTotalTransferBytes += numBytes;
    mBandwidthHistory.push_back(entry);

    // trim old samples, keeping at least kMaxBandwidthHistoryItems samples,
    // and total transfer time at least kMaxBandwidthHistoryWindowUs.
    while (mBandwidthHistory.size() > kMaxBandwidthHistoryItems) {
        List<BandwidthEntry>::iterator it = mBandwidthHistory.begin();
        if (mTotalTransferTimeUs - it->mDelayUs < kMaxBandwidthHistoryWindowUs) {
            break;
        }
        mTotalTransferTimeUs -= it->mDelayUs;
        mTotalTransferBytes -= it->mNumBytes;
        mBandwidthHistory.erase(mBandwidthHistory.begin());
    }
}

bool LiveSession::BandwidthEstimator::estimateBandwidth(int32_t *bandwidthBps) {
    AutoMutex autoLock(mLock);

    if (mBandwidthHistory.size() < 2) {
        return false;
    }

    *bandwidthBps = ((double)mTotalTransferBytes * 8E6 / mTotalTransferTimeUs);
    return true;
}

LiveSession::LiveSession(
LiveSession::LiveSession(
        const sp<AMessage> &notify, uint32_t flags,
        const sp<AMessage> &notify, uint32_t flags,
        const sp<IMediaHTTPService> &httpService)
        const sp<IMediaHTTPService> &httpService)
@@ -66,10 +128,10 @@ LiveSession::LiveSession(
      mInPreparationPhase(true),
      mInPreparationPhase(true),
      mHTTPDataSource(new MediaHTTP(mHTTPService->makeHTTPConnection())),
      mHTTPDataSource(new MediaHTTP(mHTTPService->makeHTTPConnection())),
      mCurBandwidthIndex(-1),
      mCurBandwidthIndex(-1),
      mBandwidthEstimator(new BandwidthEstimator()),
      mStreamMask(0),
      mStreamMask(0),
      mNewStreamMask(0),
      mNewStreamMask(0),
      mSwapMask(0),
      mSwapMask(0),
      mCheckBandwidthGeneration(0),
      mSwitchGeneration(0),
      mSwitchGeneration(0),
      mSubtitleGeneration(0),
      mSubtitleGeneration(0),
      mLastDequeuedTimeUs(0ll),
      mLastDequeuedTimeUs(0ll),
@@ -89,13 +151,6 @@ LiveSession::LiveSession(
        mPacketSources.add(indexToType(i), new AnotherPacketSource(NULL /* meta */));
        mPacketSources.add(indexToType(i), new AnotherPacketSource(NULL /* meta */));
        mPacketSources2.add(indexToType(i), new AnotherPacketSource(NULL /* meta */));
        mPacketSources2.add(indexToType(i), new AnotherPacketSource(NULL /* meta */));
    }
    }

    size_t numHistoryItems = kBandwidthHistoryBytes /
            PlaylistFetcher::kDownloadBlockSize + 1;
    if (numHistoryItems < 5) {
        numHistoryItems = 5;
    }
    mHTTPDataSource->setBandwidthHistorySize(numHistoryItems);
}
}


LiveSession::~LiveSession() {
LiveSession::~LiveSession() {
@@ -948,8 +1003,15 @@ static double uniformRand() {
}
}
#endif
#endif


size_t LiveSession::getBandwidthIndex() {
void LiveSession::addBandwidthMeasurement(size_t numBytes, int64_t delayUs) {
    if (mBandwidthItems.size() == 0) {
    mBandwidthEstimator->addBandwidthMeasurement(numBytes, delayUs);
}

size_t LiveSession::getBandwidthIndex(int32_t bandwidthBps) {
    if (mBandwidthItems.size() < 2) {
        // shouldn't be here if we only have 1 bandwidth, check
        // logic to get rid of redundant bandwidth polling
        ALOGW("getBandwidthIndex() called for single bandwidth playlist!");
        return 0;
        return 0;
    }
    }


@@ -967,15 +1029,6 @@ size_t LiveSession::getBandwidthIndex() {
    }
    }


    if (index < 0) {
    if (index < 0) {
        int32_t bandwidthBps;
        if (mHTTPDataSource != NULL
                && mHTTPDataSource->estimateBandwidth(&bandwidthBps)) {
            ALOGV("bandwidth estimated at %.2f kbps", bandwidthBps / 1024.0f);
        } else {
            ALOGV("no bandwidth estimate.");
            return 0;  // Pick the lowest bandwidth stream by default.
        }

        char value[PROPERTY_VALUE_MAX];
        char value[PROPERTY_VALUE_MAX];
        if (property_get("media.httplive.max-bw", value, NULL)) {
        if (property_get("media.httplive.max-bw", value, NULL)) {
            char *end;
            char *end;
@@ -992,15 +1045,9 @@ size_t LiveSession::getBandwidthIndex() {


        index = mBandwidthItems.size() - 1;
        index = mBandwidthItems.size() - 1;
        while (index > 0) {
        while (index > 0) {
            // consider only 80% of the available bandwidth, but if we are switching up,
            // be conservative (70%) to avoid overestimating and immediately
            // be even more conservative (70%) to avoid overestimating and immediately
            // switching down again.
            // switching back.
            size_t adjustedBandwidthBps = bandwidthBps * 7 / 10;
            size_t adjustedBandwidthBps = bandwidthBps;
            if (index > mCurBandwidthIndex) {
                adjustedBandwidthBps = adjustedBandwidthBps * 7 / 10;
            } else {
                adjustedBandwidthBps = adjustedBandwidthBps * 8 / 10;
            }
            if (mBandwidthItems.itemAt(index).mBandwidth <= adjustedBandwidthBps) {
            if (mBandwidthItems.itemAt(index).mBandwidth <= adjustedBandwidthBps) {
                break;
                break;
            }
            }
@@ -1577,9 +1624,9 @@ void LiveSession::cancelPollBuffering() {


void LiveSession::onPollBuffering() {
void LiveSession::onPollBuffering() {
    ALOGV("onPollBuffering: mSwitchInProgress %d, mReconfigurationInProgress %d, "
    ALOGV("onPollBuffering: mSwitchInProgress %d, mReconfigurationInProgress %d, "
            "mInPreparationPhase %d, mStreamMask 0x%x",
            "mInPreparationPhase %d, mCurBandwidthIndex %d, mStreamMask 0x%x",
        mSwitchInProgress, mReconfigurationInProgress,
        mSwitchInProgress, mReconfigurationInProgress,
        mInPreparationPhase, mStreamMask);
        mInPreparationPhase, mCurBandwidthIndex, mStreamMask);


    bool low, mid, high;
    bool low, mid, high;
    if (checkBuffering(low, mid, high)) {
    if (checkBuffering(low, mid, high)) {
@@ -1588,8 +1635,8 @@ void LiveSession::onPollBuffering() {
        }
        }


        // don't switch before we report prepared
        // don't switch before we report prepared
        if (!mInPreparationPhase && (low || high)) {
        if (!mInPreparationPhase) {
            switchBandwidthIfNeeded(high);
            switchBandwidthIfNeeded(high, !mid);
        }
        }
    }
    }


@@ -1704,11 +1751,35 @@ bool LiveSession::checkBuffering(bool &low, bool &mid, bool &high) {
    return false;
    return false;
}
}


void LiveSession::switchBandwidthIfNeeded(bool canSwitchUp) {
void LiveSession::switchBandwidthIfNeeded(bool bufferHigh, bool bufferLow) {
    ssize_t bandwidthIndex = getBandwidthIndex();
    // no need to check bandwidth if we only have 1 bandwidth settings
    if (mBandwidthItems.size() < 2) {
        return;
    }

    int32_t bandwidthBps;
    if (mBandwidthEstimator->estimateBandwidth(&bandwidthBps)) {
        ALOGV("bandwidth estimated at %.2f kbps", bandwidthBps / 1024.0f);
    } else {
        ALOGV("no bandwidth estimate.");
        return;
    }

    int32_t curBandwidth = mBandwidthItems.itemAt(mCurBandwidthIndex).mBandwidth;
    bool bandwidthLow = bandwidthBps < (int32_t)curBandwidth * 8 / 10;
    bool bandwidthHigh = bandwidthBps > (int32_t)curBandwidth * 12 / 10;

    if ((bufferHigh && bandwidthHigh) || (bufferLow && bandwidthLow)) {
        ssize_t bandwidthIndex = getBandwidthIndex(bandwidthBps);

        if (bandwidthIndex == mCurBandwidthIndex
                || (bufferHigh && bandwidthIndex < mCurBandwidthIndex)
                || (bufferLow && bandwidthIndex > mCurBandwidthIndex)) {
            return;
        }


    if ((canSwitchUp && bandwidthIndex > mCurBandwidthIndex)
        ALOGI("#### Initiate Bandwidth Switch: %d => %d",
            || (!canSwitchUp && bandwidthIndex < mCurBandwidthIndex)) {
                mCurBandwidthIndex, bandwidthIndex);
        changeConfiguration(-1, bandwidthIndex, false);
        changeConfiguration(-1, bandwidthIndex, false);
    }
    }
}
}
+5 −5
Original line number Original line Diff line number Diff line
@@ -106,7 +106,6 @@ private:
        kWhatDisconnect                 = 'disc',
        kWhatDisconnect                 = 'disc',
        kWhatSeek                       = 'seek',
        kWhatSeek                       = 'seek',
        kWhatFetcherNotify              = 'notf',
        kWhatFetcherNotify              = 'notf',
        kWhatCheckBandwidth             = 'bndw',
        kWhatChangeConfiguration        = 'chC0',
        kWhatChangeConfiguration        = 'chC0',
        kWhatChangeConfiguration2       = 'chC2',
        kWhatChangeConfiguration2       = 'chC2',
        kWhatChangeConfiguration3       = 'chC3',
        kWhatChangeConfiguration3       = 'chC3',
@@ -115,11 +114,11 @@ private:
        kWhatPollBuffering              = 'poll',
        kWhatPollBuffering              = 'poll',
    };
    };


    static const size_t kBandwidthHistoryBytes;
    static const int64_t kHighWaterMark;
    static const int64_t kHighWaterMark;
    static const int64_t kMidWaterMark;
    static const int64_t kMidWaterMark;
    static const int64_t kLowWaterMark;
    static const int64_t kLowWaterMark;


    struct BandwidthEstimator;
    struct BandwidthItem {
    struct BandwidthItem {
        size_t mPlaylistIndex;
        size_t mPlaylistIndex;
        unsigned long mBandwidth;
        unsigned long mBandwidth;
@@ -170,6 +169,7 @@ private:


    Vector<BandwidthItem> mBandwidthItems;
    Vector<BandwidthItem> mBandwidthItems;
    ssize_t mCurBandwidthIndex;
    ssize_t mCurBandwidthIndex;
    sp<BandwidthEstimator> mBandwidthEstimator;


    sp<M3UParser> mPlaylist;
    sp<M3UParser> mPlaylist;


@@ -195,7 +195,6 @@ private:
    // * a forced bandwidth switch termination in cancelSwitch on the live looper.
    // * a forced bandwidth switch termination in cancelSwitch on the live looper.
    Mutex mSwapMutex;
    Mutex mSwapMutex;


    int32_t mCheckBandwidthGeneration;
    int32_t mSwitchGeneration;
    int32_t mSwitchGeneration;
    int32_t mSubtitleGeneration;
    int32_t mSubtitleGeneration;


@@ -249,7 +248,8 @@ private:
    sp<M3UParser> fetchPlaylist(
    sp<M3UParser> fetchPlaylist(
            const char *url, uint8_t *curPlaylistHash, bool *unchanged);
            const char *url, uint8_t *curPlaylistHash, bool *unchanged);


    size_t getBandwidthIndex();
    void addBandwidthMeasurement(size_t numBytes, int64_t delayUs);
    size_t getBandwidthIndex(int32_t bandwidthBps);
    int64_t latestMediaSegmentStartTimeUs();
    int64_t latestMediaSegmentStartTimeUs();


    static int SortByBandwidth(const BandwidthItem *, const BandwidthItem *);
    static int SortByBandwidth(const BandwidthItem *, const BandwidthItem *);
@@ -273,7 +273,7 @@ private:
    void cancelPollBuffering();
    void cancelPollBuffering();
    void onPollBuffering();
    void onPollBuffering();
    bool checkBuffering(bool &low, bool &mid, bool &high);
    bool checkBuffering(bool &low, bool &mid, bool &high);
    void switchBandwidthIfNeeded(bool canSwitchUp);
    void switchBandwidthIfNeeded(bool bufferHigh, bool bufferLow);


    void finishDisconnect();
    void finishDisconnect();


+35 −26
Original line number Original line Diff line number Diff line
@@ -648,23 +648,23 @@ void PlaylistFetcher::onMonitorQueue() {
        targetDurationUs = targetDurationSecs * 1000000ll;
        targetDurationUs = targetDurationSecs * 1000000ll;
    }
    }


    int64_t durationToBufferUs = kMinBufferedDurationUs;

    int64_t bufferedDurationUs = 0ll;
    int64_t bufferedDurationUs = 0ll;
    status_t finalResult = NOT_ENOUGH_DATA;
    status_t finalResult = OK;
    if (mStreamTypeMask == LiveSession::STREAMTYPE_SUBTITLES) {
    if (mStreamTypeMask == LiveSession::STREAMTYPE_SUBTITLES) {
        sp<AnotherPacketSource> packetSource =
        sp<AnotherPacketSource> packetSource =
            mPacketSources.valueFor(LiveSession::STREAMTYPE_SUBTITLES);
            mPacketSources.valueFor(LiveSession::STREAMTYPE_SUBTITLES);


        bufferedDurationUs =
        bufferedDurationUs =
                packetSource->getBufferedDurationUs(&finalResult);
                packetSource->getBufferedDurationUs(&finalResult);
        finalResult = OK;
    } else {
    } else {
        // Use max stream duration to prevent us from waiting on a non-existent stream;
        // Use min stream duration, but ignore streams that never have any packet
        // when we cannot make out from the manifest what streams are included in a playlist
        // enqueued to prevent us from waiting on a non-existent stream;
        // we might assume extra streams.
        // when we cannot make out from the manifest what streams are included in
        // a playlist we might assume extra streams.
        bufferedDurationUs = -1ll;
        for (size_t i = 0; i < mPacketSources.size(); ++i) {
        for (size_t i = 0; i < mPacketSources.size(); ++i) {
            if ((mStreamTypeMask & mPacketSources.keyAt(i)) == 0) {
            if ((mStreamTypeMask & mPacketSources.keyAt(i)) == 0
                    || mPacketSources[i]->getLatestEnqueuedMeta() == NULL) {
                continue;
                continue;
            }
            }


@@ -672,24 +672,19 @@ void PlaylistFetcher::onMonitorQueue() {
                mPacketSources.valueAt(i)->getBufferedDurationUs(&finalResult);
                mPacketSources.valueAt(i)->getBufferedDurationUs(&finalResult);
            ALOGV("buffered %" PRId64 " for stream %d",
            ALOGV("buffered %" PRId64 " for stream %d",
                    bufferedStreamDurationUs, mPacketSources.keyAt(i));
                    bufferedStreamDurationUs, mPacketSources.keyAt(i));
            if (bufferedStreamDurationUs > bufferedDurationUs) {
            if (bufferedDurationUs == -1ll
                 || bufferedStreamDurationUs < bufferedDurationUs) {
                bufferedDurationUs = bufferedStreamDurationUs;
                bufferedDurationUs = bufferedStreamDurationUs;
            }
            }
        }
        }
        if (bufferedDurationUs == -1ll) {
            bufferedDurationUs = 0ll;
        }
        }
    downloadMore = (bufferedDurationUs < durationToBufferUs);

    // signal start if buffered up at least the target size
    if (!mPrepared && bufferedDurationUs > targetDurationUs && downloadMore) {
        mPrepared = true;

        ALOGV("prepared, buffered=%" PRId64 " > %" PRId64 "",
                bufferedDurationUs, targetDurationUs);
    }
    }


    if (finalResult == OK && downloadMore) {
    if (finalResult == OK && bufferedDurationUs < kMinBufferedDurationUs) {
        ALOGV("monitoring, buffered=%" PRId64 " < %" PRId64 "",
        ALOGV("monitoring, buffered=%" PRId64 " < %" PRId64 "",
                bufferedDurationUs, durationToBufferUs);
                bufferedDurationUs, kMinBufferedDurationUs);
        // delay the next download slightly; hopefully this gives other concurrent fetchers
        // delay the next download slightly; hopefully this gives other concurrent fetchers
        // a better chance to run.
        // a better chance to run.
        // onDownloadNext();
        // onDownloadNext();
@@ -697,13 +692,16 @@ void PlaylistFetcher::onMonitorQueue() {
        msg->setInt32("generation", mMonitorQueueGeneration);
        msg->setInt32("generation", mMonitorQueueGeneration);
        msg->post(1000l);
        msg->post(1000l);
    } else {
    } else {
        // Nothing to do yet, try again in a second.
        // We'd like to maintain buffering above durationToBufferUs, so try
        int64_t delayUs = mPrepared ? kMaxMonitorDelayUs : targetDurationUs / 2;
        // again when buffer just about to go below durationToBufferUs
        // (or after targetDurationUs / 2, whichever is smaller).
        int64_t delayUs = bufferedDurationUs - kMinBufferedDurationUs + 1000000ll;
        if (delayUs > targetDurationUs / 2) {
            delayUs = targetDurationUs / 2;
        }
        ALOGV("pausing for %" PRId64 ", buffered=%" PRId64 " > %" PRId64 "",
        ALOGV("pausing for %" PRId64 ", buffered=%" PRId64 " > %" PRId64 "",
                delayUs, bufferedDurationUs, durationToBufferUs);
                delayUs, bufferedDurationUs, kMinBufferedDurationUs);
        // :TRICKY: need to enforce minimum delay because the delay to
        postMonitorQueue(delayUs);
        // refresh the playlist will become 0
        postMonitorQueue(delayUs, mPrepared ? targetDurationUs * 2 : 0);
    }
    }
}
}


@@ -986,9 +984,20 @@ void PlaylistFetcher::onDownloadNext() {
    // block-wise download
    // block-wise download
    ssize_t bytesRead;
    ssize_t bytesRead;
    do {
    do {
        int64_t startUs = ALooper::GetNowUs();

        bytesRead = mSession->fetchFile(
        bytesRead = mSession->fetchFile(
                uri.c_str(), &buffer, range_offset, range_length, kDownloadBlockSize, &source);
                uri.c_str(), &buffer, range_offset, range_length, kDownloadBlockSize, &source);


        // add sample for bandwidth estimation (excluding subtitles)
        if (bytesRead > 0
                && (mStreamTypeMask
                        & (LiveSession::STREAMTYPE_AUDIO
                        | LiveSession::STREAMTYPE_VIDEO))) {
            int64_t delayUs = ALooper::GetNowUs() - startUs;
            mSession->addBandwidthMeasurement(bytesRead, delayUs);
        }

        if (bytesRead < 0) {
        if (bytesRead < 0) {
            status_t err = bytesRead;
            status_t err = bytesRead;
            ALOGE("failed to fetch .ts segment at url '%s'", uri.c_str());
            ALOGE("failed to fetch .ts segment at url '%s'", uri.c_str());