Donate to e Foundation | Murena handsets with /e/OS | Own a part of Murena! Learn more

Commit 76c156f7 authored by Chong Zhang's avatar Chong Zhang Committed by Android (Google) Code Review
Browse files

Merge "fix threading in RTSPSource and StreamingSource" into lmp-mr1-dev

parents 852dc963 180d1b96
Loading
Loading
Loading
Loading
+44 −21
Original line number Diff line number Diff line
@@ -50,7 +50,7 @@ NuPlayer::RTSPSource::RTSPSource(
      mState(DISCONNECTED),
      mFinalResult(OK),
      mDisconnectReplyID(0),
      mBuffering(true),
      mBuffering(false),
      mSeekGeneration(0),
      mEOSTimeoutAudio(0),
      mEOSTimeoutVideo(0) {
@@ -106,9 +106,7 @@ void NuPlayer::RTSPSource::prepareAsync() {
        mHandler->connect();
    }

    sp<AMessage> notifyStart = dupNotify();
    notifyStart->setInt32("what", kWhatBufferingStart);
    notifyStart->post();
    startBufferingIfNecessary();
}

void NuPlayer::RTSPSource::start() {
@@ -144,6 +142,7 @@ void NuPlayer::RTSPSource::resume() {
}

status_t NuPlayer::RTSPSource::feedMoreTSData() {
    Mutex::Autolock _l(mBufferingLock);
    return mFinalResult;
}

@@ -195,18 +194,10 @@ bool NuPlayer::RTSPSource::haveSufficientDataOnAllTracks() {

status_t NuPlayer::RTSPSource::dequeueAccessUnit(
        bool audio, sp<ABuffer> *accessUnit) {
    if (mBuffering) {
        if (!haveSufficientDataOnAllTracks()) {
    if (!stopBufferingIfNecessary()) {
        return -EWOULDBLOCK;
    }

        mBuffering = false;

        sp<AMessage> notify = dupNotify();
        notify->setInt32("what", kWhatBufferingEnd);
        notify->post();
    }

    sp<AnotherPacketSource> source = getSource(audio);

    if (source == NULL) {
@@ -246,11 +237,7 @@ status_t NuPlayer::RTSPSource::dequeueAccessUnit(
            if (!(otherSource != NULL && otherSource->isFinished(mediaDurationUs))) {
                // We should not enter buffering mode
                // if any of the sources already have detected EOS.
                mBuffering = true;

                sp<AMessage> notify = dupNotify();
                notify->setInt32("what", kWhatBufferingStart);
                notify->post();
                startBufferingIfNecessary();
            }

            return -EWOULDBLOCK;
@@ -630,7 +617,7 @@ void NuPlayer::RTSPSource::onSDPLoaded(const sp<AMessage> &msg) {
        }

        mState = DISCONNECTED;
        mFinalResult = err;
        setError(err);

        if (mDisconnectReplyID != 0) {
            finishDisconnectIfPossible();
@@ -657,7 +644,7 @@ void NuPlayer::RTSPSource::onDisconnected(const sp<AMessage> &msg) {
    }

    mState = DISCONNECTED;
    mFinalResult = err;
    setError(err);

    if (mDisconnectReplyID != 0) {
        finishDisconnectIfPossible();
@@ -678,4 +665,40 @@ void NuPlayer::RTSPSource::finishDisconnectIfPossible() {
    mDisconnectReplyID = 0;
}

void NuPlayer::RTSPSource::setError(status_t err) {
    Mutex::Autolock _l(mBufferingLock);
    mFinalResult = err;
}

void NuPlayer::RTSPSource::startBufferingIfNecessary() {
    Mutex::Autolock _l(mBufferingLock);

    if (!mBuffering) {
        mBuffering = true;

        sp<AMessage> notify = dupNotify();
        notify->setInt32("what", kWhatBufferingStart);
        notify->post();
    }
}

bool NuPlayer::RTSPSource::stopBufferingIfNecessary() {
    Mutex::Autolock _l(mBufferingLock);

    if (mBuffering) {
        if (!haveSufficientDataOnAllTracks()) {
            return false;
        }

        mBuffering = false;

        sp<AMessage> notify = dupNotify();
        notify->setInt32("what", kWhatBufferingEnd);
        notify->post();
    }

    return true;
}


}  // namespace android
+4 −0
Original line number Diff line number Diff line
@@ -97,6 +97,7 @@ private:
    State mState;
    status_t mFinalResult;
    uint32_t mDisconnectReplyID;
    Mutex mBufferingLock;
    bool mBuffering;

    sp<ALooper> mLooper;
@@ -126,6 +127,9 @@ private:
    bool haveSufficientDataOnAllTracks();

    void setEOSTimeout(bool audio, int64_t timeout);
    void setError(status_t err);
    void startBufferingIfNecessary();
    bool stopBufferingIfNecessary();

    DISALLOW_EVIL_CONSTRUCTORS(RTSPSource);
};
+108 −18
Original line number Diff line number Diff line
@@ -37,13 +37,26 @@ NuPlayer::StreamingSource::StreamingSource(
        const sp<IStreamSource> &source)
    : Source(notify),
      mSource(source),
      mFinalResult(OK) {
      mFinalResult(OK),
      mBuffering(false) {
}

NuPlayer::StreamingSource::~StreamingSource() {
    if (mLooper != NULL) {
        mLooper->unregisterHandler(id());
        mLooper->stop();
    }
}

void NuPlayer::StreamingSource::prepareAsync() {
    if (mLooper == NULL) {
        mLooper = new ALooper;
        mLooper->setName("streaming");
        mLooper->start();

        mLooper->registerHandler(this);
    }

    notifyVideoSizeChanged();
    notifyFlagsChanged(0);
    notifyPrepared();
@@ -62,13 +75,15 @@ void NuPlayer::StreamingSource::start() {
    mTSParser = new ATSParser(parserFlags);

    mStreamListener->start();

    postReadBuffer();
}

status_t NuPlayer::StreamingSource::feedMoreTSData() {
    if (mFinalResult != OK) {
        return mFinalResult;
    return postReadBuffer();
}

void NuPlayer::StreamingSource::onReadBuffer() {
    for (int32_t i = 0; i < 50; ++i) {
        char buffer[188];
        sp<AMessage> extra;
@@ -77,7 +92,7 @@ status_t NuPlayer::StreamingSource::feedMoreTSData() {
        if (n == 0) {
            ALOGI("input data EOS reached.");
            mTSParser->signalEOS(ERROR_END_OF_STREAM);
            mFinalResult = ERROR_END_OF_STREAM;
            setError(ERROR_END_OF_STREAM);
            break;
        } else if (n == INFO_DISCONTINUITY) {
            int32_t type = ATSParser::DISCONTINUITY_TIME;
@@ -88,7 +103,8 @@ status_t NuPlayer::StreamingSource::feedMoreTSData() {
                        IStreamListener::kKeyDiscontinuityMask, &mask)) {
                if (mask == 0) {
                    ALOGE("Client specified an illegal discontinuity type.");
                    return ERROR_UNSUPPORTED;
                    setError(ERROR_UNSUPPORTED);
                    break;
                }

                type = mask;
@@ -97,7 +113,6 @@ status_t NuPlayer::StreamingSource::feedMoreTSData() {
            mTSParser->signalDiscontinuity(
                    (ATSParser::DiscontinuityType)type, extra);
        } else if (n < 0) {
            CHECK_EQ(n, -EWOULDBLOCK);
            break;
        } else {
            if (buffer[0] == 0x00) {
@@ -128,26 +143,80 @@ status_t NuPlayer::StreamingSource::feedMoreTSData() {
                    ALOGE("TS Parser returned error %d", err);

                    mTSParser->signalEOS(err);
                    mFinalResult = err;
                    setError(err);
                    break;
                }
            }
        }
    }
}

status_t NuPlayer::StreamingSource::postReadBuffer() {
    {
        Mutex::Autolock _l(mBufferingLock);
        if (mFinalResult != OK) {
            return mFinalResult;
        }
        if (mBuffering) {
            return OK;
        }
        mBuffering = true;
    }

sp<MetaData> NuPlayer::StreamingSource::getFormatMeta(bool audio) {
    (new AMessage(kWhatReadBuffer, id()))->post();
    return OK;
}

bool NuPlayer::StreamingSource::haveSufficientDataOnAllTracks() {
    // We're going to buffer at least 2 secs worth data on all tracks before
    // starting playback (both at startup and after a seek).

    static const int64_t kMinDurationUs = 2000000ll;

    sp<AnotherPacketSource> audioTrack = getSource(true /*audio*/);
    sp<AnotherPacketSource> videoTrack = getSource(false /*audio*/);

    status_t err;
    int64_t durationUs;
    if (audioTrack != NULL
            && (durationUs = audioTrack->getBufferedDurationUs(&err))
                    < kMinDurationUs
            && err == OK) {
        ALOGV("audio track doesn't have enough data yet. (%.2f secs buffered)",
              durationUs / 1E6);
        return false;
    }

    if (videoTrack != NULL
            && (durationUs = videoTrack->getBufferedDurationUs(&err))
                    < kMinDurationUs
            && err == OK) {
        ALOGV("video track doesn't have enough data yet. (%.2f secs buffered)",
              durationUs / 1E6);
        return false;
    }

    return true;
}

void NuPlayer::StreamingSource::setError(status_t err) {
    Mutex::Autolock _l(mBufferingLock);
    mFinalResult = err;
}

sp<AnotherPacketSource> NuPlayer::StreamingSource::getSource(bool audio) {
    if (mTSParser == NULL) {
        return NULL;
    }

    ATSParser::SourceType type =
        audio ? ATSParser::AUDIO : ATSParser::VIDEO;
    sp<MediaSource> source = mTSParser->getSource(
            audio ? ATSParser::AUDIO : ATSParser::VIDEO);

    sp<AnotherPacketSource> source =
        static_cast<AnotherPacketSource *>(mTSParser->getSource(type).get());
    return static_cast<AnotherPacketSource *>(source.get());
}

sp<MetaData> NuPlayer::StreamingSource::getFormatMeta(bool audio) {
    sp<AnotherPacketSource> source = getSource(audio);

    if (source == NULL) {
        return NULL;
@@ -158,16 +227,16 @@ sp<MetaData> NuPlayer::StreamingSource::getFormatMeta(bool audio) {

status_t NuPlayer::StreamingSource::dequeueAccessUnit(
        bool audio, sp<ABuffer> *accessUnit) {
    ATSParser::SourceType type =
        audio ? ATSParser::AUDIO : ATSParser::VIDEO;

    sp<AnotherPacketSource> source =
        static_cast<AnotherPacketSource *>(mTSParser->getSource(type).get());
    sp<AnotherPacketSource> source = getSource(audio);

    if (source == NULL) {
        return -EWOULDBLOCK;
    }

    if (!haveSufficientDataOnAllTracks()) {
        postReadBuffer();
    }

    status_t finalResult;
    if (!source->hasBufferAvailable(&finalResult)) {
        return finalResult == OK ? -EWOULDBLOCK : finalResult;
@@ -190,5 +259,26 @@ bool NuPlayer::StreamingSource::isRealTime() const {
    return mSource->flags() & IStreamSource::kFlagIsRealTimeData;
}

void NuPlayer::StreamingSource::onMessageReceived(
        const sp<AMessage> &msg) {
    switch (msg->what()) {
        case kWhatReadBuffer:
        {
            onReadBuffer();

            {
                Mutex::Autolock _l(mBufferingLock);
                mBuffering = false;
            }
            break;
        }
        default:
        {
            TRESPASS();
        }
    }
}


}  // namespace android
+16 −0
Original line number Diff line number Diff line
@@ -25,6 +25,7 @@ namespace android {

struct ABuffer;
struct ATSParser;
struct AnotherPacketSource;

struct NuPlayer::StreamingSource : public NuPlayer::Source {
    StreamingSource(
@@ -43,14 +44,29 @@ struct NuPlayer::StreamingSource : public NuPlayer::Source {
protected:
    virtual ~StreamingSource();

    virtual void onMessageReceived(const sp<AMessage> &msg);

    virtual sp<MetaData> getFormatMeta(bool audio);

private:
    enum {
        kWhatReadBuffer,
    };
    sp<IStreamSource> mSource;
    status_t mFinalResult;
    sp<NuPlayerStreamListener> mStreamListener;
    sp<ATSParser> mTSParser;

    bool mBuffering;
    Mutex mBufferingLock;
    sp<ALooper> mLooper;

    void setError(status_t err);
    sp<AnotherPacketSource> getSource(bool audio);
    bool haveSufficientDataOnAllTracks();
    status_t postReadBuffer();
    void onReadBuffer();

    DISALLOW_EVIL_CONSTRUCTORS(StreamingSource);
};