Donate to e Foundation | Murena handsets with /e/OS | Own a part of Murena! Learn more

Commit 1543d3c7 authored by Robert Shih's avatar Robert Shih
Browse files

Initial HLS seamless switch implementation.

Bug: 11854054
Change-Id: I75fc2a258111295039ac13cc37e407df25891dd2
parent 68d074fe
Loading
Loading
Loading
Loading
+250 −30
Original line number Diff line number Diff line
@@ -40,6 +40,8 @@
#include <media/stagefright/MetaData.h>
#include <media/stagefright/Utils.h>

#include <utils/Mutex.h>

#include <ctype.h>
#include <openssl/aes.h>
#include <openssl/md5.h>
@@ -56,10 +58,14 @@ LiveSession::LiveSession(
      mHTTPDataSource(new MediaHTTP(mHTTPService->makeHTTPConnection())),
      mPrevBandwidthIndex(-1),
      mStreamMask(0),
      mNewStreamMask(0),
      mSwapMask(0),
      mCheckBandwidthGeneration(0),
      mSwitchGeneration(0),
      mLastDequeuedTimeUs(0ll),
      mRealTimeBaseUs(0ll),
      mReconfigurationInProgress(false),
      mSwitchInProgress(false),
      mDisconnectReplyID(0) {

    mStreams[kAudioIndex] = StreamItem("audio");
@@ -68,16 +74,37 @@ LiveSession::LiveSession(

    for (size_t i = 0; i < kMaxStreams; ++i) {
        mPacketSources.add(indexToType(i), new AnotherPacketSource(NULL /* meta */));
        mPacketSources2.add(indexToType(i), new AnotherPacketSource(NULL /* meta */));
    }
}

LiveSession::~LiveSession() {
}

sp<ABuffer> LiveSession::createFormatChangeBuffer(bool swap) {
    ABuffer *discontinuity = new ABuffer(0);
    discontinuity->meta()->setInt32("discontinuity", ATSParser::DISCONTINUITY_FORMATCHANGE);
    discontinuity->meta()->setInt32("swapPacketSource", swap);
    discontinuity->meta()->setInt32("switchGeneration", mSwitchGeneration);
    discontinuity->meta()->setInt64("timeUs", -1);
    return discontinuity;
}

void LiveSession::swapPacketSource(StreamType stream) {
    sp<AnotherPacketSource> &aps = mPacketSources.editValueFor(stream);
    sp<AnotherPacketSource> &aps2 = mPacketSources2.editValueFor(stream);
    sp<AnotherPacketSource> tmp = aps;
    aps = aps2;
    aps2 = tmp;
    aps2->clear();
}

status_t LiveSession::dequeueAccessUnit(
        StreamType stream, sp<ABuffer> *accessUnit) {
    if (!(mStreamMask & stream)) {
        return UNKNOWN_ERROR;
        // return -EWOULDBLOCK to avoid halting the decoder
        // when switching between audio/video and audio only.
        return -EWOULDBLOCK;
    }

    sp<AnotherPacketSource> packetSource = mPacketSources.valueFor(stream);
@@ -117,6 +144,25 @@ status_t LiveSession::dequeueAccessUnit(
              streamStr,
              type,
              extra == NULL ? "NULL" : extra->debugString().c_str());

        int32_t swap;
        if (type == ATSParser::DISCONTINUITY_FORMATCHANGE
                && (*accessUnit)->meta()->findInt32("swapPacketSource", &swap)
                && swap) {

            int32_t switchGeneration;
            CHECK((*accessUnit)->meta()->findInt32("switchGeneration", &switchGeneration));
            {
                Mutex::Autolock lock(mSwapMutex);
                if (switchGeneration == mSwitchGeneration) {
                    swapPacketSource(stream);
                    sp<AMessage> msg = new AMessage(kWhatSwapped, id());
                    msg->setInt32("stream", stream);
                    msg->setInt32("switchGeneration", switchGeneration);
                    msg->post();
                }
            }
        }
    } else if (err == OK) {
        if (stream == STREAMTYPE_AUDIO || stream == STREAMTYPE_VIDEO) {
            int64_t timeUs;
@@ -138,6 +184,7 @@ status_t LiveSession::dequeueAccessUnit(
}

status_t LiveSession::getStreamFormat(StreamType stream, sp<AMessage> *format) {
    // No swapPacketSource race condition; called from the same thread as dequeueAccessUnit.
    if (!(mStreamMask & stream)) {
        return UNKNOWN_ERROR;
    }
@@ -234,7 +281,12 @@ void LiveSession::onMessageReceived(const sp<AMessage> &msg) {
                    if (what == PlaylistFetcher::kWhatStopped) {
                        AString uri;
                        CHECK(msg->findString("uri", &uri));
                        mFetcherInfos.removeItem(uri);
                        if (mFetcherInfos.removeItem(uri) < 0) {
                            // ignore duplicated kWhatStopped messages.
                            break;
                        }

                        tryToFinishBandwidthSwitch();
                    }

                    if (mContinuation != NULL) {
@@ -270,6 +322,8 @@ void LiveSession::onMessageReceived(const sp<AMessage> &msg) {
                        postPrepared(err);
                    }

                    cancelBandwidthSwitch();

                    mPacketSources.valueFor(STREAMTYPE_AUDIO)->signalEOS(err);

                    mPacketSources.valueFor(STREAMTYPE_VIDEO)->signalEOS(err);
@@ -308,6 +362,27 @@ void LiveSession::onMessageReceived(const sp<AMessage> &msg) {
                    break;
                }

                case PlaylistFetcher::kWhatStartedAt:
                {
                    int32_t switchGeneration;
                    CHECK(msg->findInt32("switchGeneration", &switchGeneration));

                    if (switchGeneration != mSwitchGeneration) {
                        break;
                    }

                    // Resume fetcher for the original variant; the resumed fetcher should
                    // continue until the timestamps found in msg, which is stored by the
                    // new fetcher to indicate where the new variant has started buffering.
                    for (size_t i = 0; i < mFetcherInfos.size(); i++) {
                        const FetcherInfo info = mFetcherInfos.valueAt(i);
                        if (info.mToBeRemoved) {
                            info.mFetcher->resumeUntilAsync(msg);
                        }
                    }
                    break;
                }

                default:
                    TRESPASS();
            }
@@ -352,6 +427,11 @@ void LiveSession::onMessageReceived(const sp<AMessage> &msg) {
            break;
        }

        case kWhatSwapped:
        {
            onSwapped(msg);
            break;
        }
        default:
            TRESPASS();
            break;
@@ -462,6 +542,10 @@ void LiveSession::finishDisconnect() {
    // during disconnection either.
    cancelCheckBandwidthEvent();

    // Protect mPacketSources from a swapPacketSource race condition through disconnect.
    // (finishDisconnect, onFinishDisconnect2)
    cancelBandwidthSwitch();

    for (size_t i = 0; i < mFetcherInfos.size(); ++i) {
        mFetcherInfos.valueAt(i).mFetcher->stopAsync();
    }
@@ -501,11 +585,13 @@ sp<PlaylistFetcher> LiveSession::addFetcher(const char *uri) {

    sp<AMessage> notify = new AMessage(kWhatFetcherNotify, id());
    notify->setString("uri", uri);
    notify->setInt32("switchGeneration", mSwitchGeneration);

    FetcherInfo info;
    info.mFetcher = new PlaylistFetcher(notify, this, uri);
    info.mDurationUs = -1ll;
    info.mIsPrepared = false;
    info.mToBeRemoved = false;
    looper()->registerHandler(info.mFetcher);

    mFetcherInfos.add(uri, info);
@@ -845,8 +931,25 @@ status_t LiveSession::selectTrack(size_t index, bool select) {
    return err;
}

bool LiveSession::canSwitchUp() {
    // Allow upwards bandwidth switch when a stream has buffered at least 10 seconds.
    status_t err = OK;
    for (size_t i = 0; i < mPacketSources.size(); ++i) {
        sp<AnotherPacketSource> source = mPacketSources.valueAt(i);
        int64_t dur = source->getBufferedDurationUs(&err);
        if (err == OK && dur > 10000000) {
            return true;
        }
    }
    return false;
}

void LiveSession::changeConfiguration(
        int64_t timeUs, size_t bandwidthIndex, bool pickTrack) {
    // Protect mPacketSources from a swapPacketSource race condition through reconfiguration.
    // (changeConfiguration, onChangeConfiguration2, onChangeConfiguration3).
    cancelBandwidthSwitch();

    CHECK(!mReconfigurationInProgress);
    mReconfigurationInProgress = true;

@@ -862,7 +965,8 @@ void LiveSession::changeConfiguration(
    CHECK_LT(bandwidthIndex, mBandwidthItems.size());
    const BandwidthItem &item = mBandwidthItems.itemAt(bandwidthIndex);

    uint32_t streamMask = 0;
    uint32_t streamMask = 0; // streams that should be fetched by the new fetcher
    uint32_t resumeMask = 0; // streams that should be fetched by the original fetcher

    AString URIs[kMaxStreams];
    for (size_t i = 0; i < kMaxStreams; ++i) {
@@ -880,9 +984,14 @@ void LiveSession::changeConfiguration(

        // If we're seeking all current fetchers are discarded.
        if (timeUs < 0ll) {
            for (size_t j = 0; j < kMaxStreams; ++j) {
                if ((streamMask & indexToType(j)) && uri == URIs[j]) {
            // delay fetcher removal
            discardFetcher = false;

            for (size_t j = 0; j < kMaxStreams; ++j) {
                StreamType type = indexToType(j);
                if ((streamMask & type) && uri == URIs[j]) {
                    resumeMask |= type;
                    streamMask &= ~type;
                }
            }
        }
@@ -894,8 +1003,15 @@ void LiveSession::changeConfiguration(
        }
    }

    sp<AMessage> msg = new AMessage(kWhatChangeConfiguration2, id());
    sp<AMessage> msg;
    if (timeUs < 0ll) {
        // skip onChangeConfiguration2 (decoder destruction) if switching.
        msg = new AMessage(kWhatChangeConfiguration3, id());
    } else {
        msg = new AMessage(kWhatChangeConfiguration2, id());
    }
    msg->setInt32("streamMask", streamMask);
    msg->setInt32("resumeMask", resumeMask);
    msg->setInt64("timeUs", timeUs);
    for (size_t i = 0; i < kMaxStreams; ++i) {
        if (streamMask & indexToType(i)) {
@@ -978,11 +1094,13 @@ void LiveSession::onChangeConfiguration2(const sp<AMessage> &msg) {
}

void LiveSession::onChangeConfiguration3(const sp<AMessage> &msg) {
    mContinuation.clear();
    // All remaining fetchers are still suspended, the player has shutdown
    // any decoders that needed it.

    uint32_t streamMask;
    uint32_t streamMask, resumeMask;
    CHECK(msg->findInt32("streamMask", (int32_t *)&streamMask));
    CHECK(msg->findInt32("resumeMask", (int32_t *)&resumeMask));

    for (size_t i = 0; i < kMaxStreams; ++i) {
        if (streamMask & indexToType(i)) {
@@ -991,37 +1109,39 @@ void LiveSession::onChangeConfiguration3(const sp<AMessage> &msg) {
    }

    int64_t timeUs;
    bool switching = false;
    CHECK(msg->findInt64("timeUs", &timeUs));

    if (timeUs < 0ll) {
        timeUs = mLastDequeuedTimeUs;
        switching = true;
    }
    mRealTimeBaseUs = ALooper::GetNowUs() - timeUs;

    mStreamMask = streamMask;
    mNewStreamMask = streamMask;

    // Resume all existing fetchers and assign them packet sources.
    // Of all existing fetchers:
    // * Resume fetchers that are still needed and assign them original packet sources.
    // * Mark otherwise unneeded fetchers for removal.
    ALOGV("resuming fetchers for mask 0x%08x", resumeMask);
    for (size_t i = 0; i < mFetcherInfos.size(); ++i) {
        const AString &uri = mFetcherInfos.keyAt(i);

        uint32_t resumeMask = 0;

        sp<AnotherPacketSource> sources[kMaxStreams];
        for (size_t j = 0; j < kMaxStreams; ++j) {
            if ((streamMask & indexToType(j)) && uri == mStreams[j].mUri) {
            if ((resumeMask & indexToType(j)) && uri == mStreams[j].mUri) {
                sources[j] = mPacketSources.valueFor(indexToType(j));
                resumeMask |= indexToType(j);
            }
        }

        CHECK_NE(resumeMask, 0u);

        ALOGV("resuming fetchers for mask 0x%08x", resumeMask);

        streamMask &= ~resumeMask;

        mFetcherInfos.valueAt(i).mFetcher->startAsync(
        FetcherInfo &info = mFetcherInfos.editValueAt(i);
        if (sources[kAudioIndex] != NULL || sources[kVideoIndex] != NULL
                || sources[kSubtitleIndex] != NULL) {
            info.mFetcher->startAsync(
                    sources[kAudioIndex], sources[kVideoIndex], sources[kSubtitleIndex]);
        } else {
            info.mToBeRemoved = true;
        }
    }

    // streamMask now only contains the types that need a new fetcher created.
@@ -1030,6 +1150,8 @@ void LiveSession::onChangeConfiguration3(const sp<AMessage> &msg) {
        ALOGV("creating new fetchers for mask 0x%08x", streamMask);
    }

    // Find out when the original fetchers have buffered up to and start the new fetchers
    // at a later timestamp.
    for (size_t i = 0; i < kMaxStreams; i++) {
        if (!(indexToType(i) & streamMask)) {
            continue;
@@ -1041,12 +1163,40 @@ void LiveSession::onChangeConfiguration3(const sp<AMessage> &msg) {
        sp<PlaylistFetcher> fetcher = addFetcher(uri.c_str());
        CHECK(fetcher != NULL);

        int32_t latestSeq = -1;
        int64_t latestTimeUs = 0ll;
        sp<AnotherPacketSource> sources[kMaxStreams];

        // TRICKY: looping from i as earlier streams are already removed from streamMask
        for (size_t j = i; j < kMaxStreams; ++j) {
            if ((streamMask & indexToType(j)) && uri == mStreams[j].mUri) {
                sources[j] = mPacketSources.valueFor(indexToType(j));

                if (!switching) {
                    sources[j]->clear();
                } else {
                    int32_t type, seq;
                    int64_t srcTimeUs;
                    sp<AMessage> meta = sources[j]->getLatestMeta();

                    if (meta != NULL && !meta->findInt32("discontinuity", &type)) {
                        CHECK(meta->findInt32("seq", &seq));
                        if (seq > latestSeq) {
                            latestSeq = seq;
                        }
                        CHECK(meta->findInt64("timeUs", &srcTimeUs));
                        if (srcTimeUs > latestTimeUs) {
                            latestTimeUs = srcTimeUs;
                        }
                    }

                    sources[j] = mPacketSources2.valueFor(indexToType(j));
                    sources[j]->clear();
                    uint32_t extraStreams = mNewStreamMask & (~mStreamMask);
                    if (extraStreams & indexToType(j)) {
                        sources[j]->queueAccessUnit(createFormatChangeBuffer(/* swap = */ false));
                    }
                }

                streamMask &= ~indexToType(j);
            }
@@ -1056,7 +1206,9 @@ void LiveSession::onChangeConfiguration3(const sp<AMessage> &msg) {
                sources[kAudioIndex],
                sources[kVideoIndex],
                sources[kSubtitleIndex],
                timeUs);
                timeUs,
                latestTimeUs /* min start time(us) */,
                latestSeq >= 0 ? latestSeq + 1 : -1 /* starting sequence number hint */ );
    }

    // All fetchers have now been started, the configuration change
@@ -1065,14 +1217,61 @@ void LiveSession::onChangeConfiguration3(const sp<AMessage> &msg) {
    scheduleCheckBandwidthEvent();

    ALOGV("XXX configuration change completed.");

    mReconfigurationInProgress = false;
    if (switching) {
        mSwitchInProgress = true;
    } else {
        mStreamMask = mNewStreamMask;
    }

    if (mDisconnectReplyID != 0) {
        finishDisconnect();
    }
}

void LiveSession::onSwapped(const sp<AMessage> &msg) {
    int32_t switchGeneration;
    CHECK(msg->findInt32("switchGeneration", &switchGeneration));
    if (switchGeneration != mSwitchGeneration) {
        return;
    }

    int32_t stream;
    CHECK(msg->findInt32("stream", &stream));
    mSwapMask |= stream;
    if (mSwapMask != mStreamMask) {
        return;
    }

    // Check if new variant contains extra streams.
    uint32_t extraStreams = mNewStreamMask & (~mStreamMask);
    while (extraStreams) {
        StreamType extraStream = (StreamType) (extraStreams & ~(extraStreams - 1));
        swapPacketSource(extraStream);
        extraStreams &= ~extraStream;
    }

    tryToFinishBandwidthSwitch();
}

// Mark switch done when:
//   1. all old buffers are swapped out, AND
//   2. all old fetchers are removed.
void LiveSession::tryToFinishBandwidthSwitch() {
    bool needToRemoveFetchers = false;
    for (size_t i = 0; i < mFetcherInfos.size(); ++i) {
        if (mFetcherInfos.valueAt(i).mToBeRemoved) {
            needToRemoveFetchers = true;
            break;
        }
    }
    if (!needToRemoveFetchers && mSwapMask == mStreamMask) {
        mStreamMask = mNewStreamMask;
        mSwitchInProgress = false;
        mSwapMask = 0;
    }
}

void LiveSession::scheduleCheckBandwidthEvent() {
    sp<AMessage> msg = new AMessage(kWhatCheckBandwidth, id());
    msg->setInt32("generation", mCheckBandwidthGeneration);
@@ -1083,16 +1282,37 @@ void LiveSession::cancelCheckBandwidthEvent() {
    ++mCheckBandwidthGeneration;
}

void LiveSession::onCheckBandwidth() {
    if (mReconfigurationInProgress) {
        scheduleCheckBandwidthEvent();
        return;
void LiveSession::cancelBandwidthSwitch() {
    Mutex::Autolock lock(mSwapMutex);
    mSwitchGeneration++;
    mSwitchInProgress = false;
    mSwapMask = 0;
}

bool LiveSession::canSwitchBandwidthTo(size_t bandwidthIndex) {
    if (mReconfigurationInProgress || mSwitchInProgress) {
        return false;
    }

    if (mPrevBandwidthIndex < 0) {
        return true;
    }

    if (bandwidthIndex == (size_t)mPrevBandwidthIndex) {
        return false;
    } else if (bandwidthIndex > (size_t)mPrevBandwidthIndex) {
        return canSwitchUp();
    } else {
        return true;
    }
}

void LiveSession::onCheckBandwidth() {
    size_t bandwidthIndex = getBandwidthIndex();
    if (mPrevBandwidthIndex < 0
            || bandwidthIndex != (size_t)mPrevBandwidthIndex) {
    if (canSwitchBandwidthTo(bandwidthIndex)) {
        changeConfiguration(-1ll /* timeUs */, bandwidthIndex);
    } else {
        scheduleCheckBandwidthEvent();
    }

    // Handling the kWhatCheckBandwidth even here does _not_ automatically
+36 −0
Original line number Diff line number Diff line
@@ -83,6 +83,11 @@ struct LiveSession : public AHandler {
        kWhatPreparationFailed,
    };

    // create a format-change discontinuity
    //
    // swap:
    //   whether is format-change discontinuity should trigger a buffer swap
    sp<ABuffer> createFormatChangeBuffer(bool swap = true);
protected:
    virtual ~LiveSession();

@@ -101,6 +106,7 @@ private:
        kWhatChangeConfiguration2       = 'chC2',
        kWhatChangeConfiguration3       = 'chC3',
        kWhatFinishDisconnect2          = 'fin2',
        kWhatSwapped                    = 'swap',
    };

    struct BandwidthItem {
@@ -112,6 +118,7 @@ private:
        sp<PlaylistFetcher> mFetcher;
        int64_t mDurationUs;
        bool mIsPrepared;
        bool mToBeRemoved;
    };

    struct StreamItem {
@@ -146,9 +153,26 @@ private:
    KeyedVector<AString, FetcherInfo> mFetcherInfos;
    uint32_t mStreamMask;

    // Masks used during reconfiguration:
    // mNewStreamMask: streams in the variant playlist we're switching to;
    // we don't want to immediately overwrite the original value.
    uint32_t mNewStreamMask;

    // mSwapMask: streams that have started to playback content in the new variant playlist;
    // we use this to track reconfiguration progress.
    uint32_t mSwapMask;

    KeyedVector<StreamType, sp<AnotherPacketSource> > mPacketSources;
    // A second set of packet sources that buffer content for the variant we're switching to.
    KeyedVector<StreamType, sp<AnotherPacketSource> > mPacketSources2;

    // A mutex used to serialize two sets of events:
    // * the swapping of packet sources in dequeueAccessUnit on the player thread, AND
    // * a forced bandwidth switch termination in cancelSwitch on the live looper.
    Mutex mSwapMutex;

    int32_t mCheckBandwidthGeneration;
    int32_t mSwitchGeneration;

    size_t mContinuationCounter;
    sp<AMessage> mContinuation;
@@ -157,6 +181,7 @@ private:
    int64_t mRealTimeBaseUs;

    bool mReconfigurationInProgress;
    bool mSwitchInProgress;
    uint32_t mDisconnectReplyID;

    sp<PlaylistFetcher> addFetcher(const char *uri);
@@ -199,16 +224,27 @@ private:
    void onChangeConfiguration(const sp<AMessage> &msg);
    void onChangeConfiguration2(const sp<AMessage> &msg);
    void onChangeConfiguration3(const sp<AMessage> &msg);
    void onSwapped(const sp<AMessage> &msg);
    void tryToFinishBandwidthSwitch();

    void scheduleCheckBandwidthEvent();
    void cancelCheckBandwidthEvent();

    // cancelBandwidthSwitch is atomic wrt swapPacketSource; call it to prevent packet sources
    // from being swapped out on stale discontinuities while manipulating
    // mPacketSources/mPacketSources2.
    void cancelBandwidthSwitch();

    bool canSwitchBandwidthTo(size_t bandwidthIndex);
    void onCheckBandwidth();

    void finishDisconnect();

    void postPrepared(status_t err);

    void swapPacketSource(StreamType stream);
    bool canSwitchUp();

    DISALLOW_EVIL_CONSTRUCTORS(LiveSession);
};

+234 −17

File changed.

Preview size limit exceeded, changes collapsed.

+21 −1
Original line number Diff line number Diff line
@@ -43,6 +43,7 @@ struct PlaylistFetcher : public AHandler {
        kWhatTemporarilyDoneFetching,
        kWhatPrepared,
        kWhatPreparationFailed,
        kWhatStartedAt,
    };

    PlaylistFetcher(
@@ -56,12 +57,16 @@ struct PlaylistFetcher : public AHandler {
            const sp<AnotherPacketSource> &audioSource,
            const sp<AnotherPacketSource> &videoSource,
            const sp<AnotherPacketSource> &subtitleSource,
            int64_t startTimeUs = -1ll);
            int64_t startTimeUs = -1ll,
            int64_t minStartTimeUs = 0ll /* start after this timestamp */,
            int32_t startSeqNumberHint = -1 /* try starting at this sequence number */);

    void pauseAsync();

    void stopAsync();

    void resumeUntilAsync(const sp<AMessage> &params);

protected:
    virtual ~PlaylistFetcher();
    virtual void onMessageReceived(const sp<AMessage> &msg);
@@ -76,17 +81,25 @@ private:
        kWhatPause          = 'paus',
        kWhatStop           = 'stop',
        kWhatMonitorQueue   = 'moni',
        kWhatResumeUntil    = 'rsme',
        kWhatDownloadNext   = 'dlnx',
    };

    static const int64_t kMinBufferedDurationUs;
    static const int64_t kMaxMonitorDelayUs;
    static const int32_t kNumSkipFrames;

    // notifications to mSession
    sp<AMessage> mNotify;
    sp<AMessage> mStartTimeUsNotify;

    sp<LiveSession> mSession;
    AString mURI;

    uint32_t mStreamTypeMask;
    int64_t mStartTimeUs;
    int64_t mMinStartTimeUs; // start fetching no earlier than this value
    sp<AMessage> mStopParams; // message containing the latest timestamps we should fetch.

    KeyedVector<LiveSession::StreamType, sp<AnotherPacketSource> >
        mPacketSources;
@@ -153,6 +166,9 @@ private:
    void onMonitorQueue();
    void onDownloadNext();

    // Resume a fetcher to continue until the stopping point stored in msg.
    status_t onResumeUntil(const sp<AMessage> &msg);

    status_t extractAndQueueAccessUnits(
            const sp<ABuffer> &buffer, const sp<AMessage> &itemMeta);

@@ -165,6 +181,10 @@ private:

    void updateDuration();

    // Before resuming a fetcher in onResume, check the remaining duration is longer than that
    // returned by resumeThreshold.
    int64_t resumeThreshold(const sp<AMessage> &msg);

    DISALLOW_EVIL_CONSTRUCTORS(PlaylistFetcher);
};