Donate to e Foundation | Murena handsets with /e/OS | Own a part of Murena! Learn more

Commit 7cc9a1c5 authored by Chong Zhang's avatar Chong Zhang Committed by Android Git Automerger
Browse files

am 76c156f7: Merge "fix threading in RTSPSource and StreamingSource" into lmp-mr1-dev

* commit '76c156f7':
  fix threading in RTSPSource and StreamingSource
parents 25bad49c 76c156f7
Loading
Loading
Loading
Loading
+44 −21
Original line number Diff line number Diff line
@@ -50,7 +50,7 @@ NuPlayer::RTSPSource::RTSPSource(
      mState(DISCONNECTED),
      mFinalResult(OK),
      mDisconnectReplyID(0),
      mBuffering(true),
      mBuffering(false),
      mSeekGeneration(0),
      mEOSTimeoutAudio(0),
      mEOSTimeoutVideo(0) {
@@ -106,9 +106,7 @@ void NuPlayer::RTSPSource::prepareAsync() {
        mHandler->connect();
    }

    sp<AMessage> notifyStart = dupNotify();
    notifyStart->setInt32("what", kWhatBufferingStart);
    notifyStart->post();
    startBufferingIfNecessary();
}

void NuPlayer::RTSPSource::start() {
@@ -144,6 +142,7 @@ void NuPlayer::RTSPSource::resume() {
}

status_t NuPlayer::RTSPSource::feedMoreTSData() {
    Mutex::Autolock _l(mBufferingLock);
    return mFinalResult;
}

@@ -195,18 +194,10 @@ bool NuPlayer::RTSPSource::haveSufficientDataOnAllTracks() {

status_t NuPlayer::RTSPSource::dequeueAccessUnit(
        bool audio, sp<ABuffer> *accessUnit) {
    if (mBuffering) {
        if (!haveSufficientDataOnAllTracks()) {
    if (!stopBufferingIfNecessary()) {
        return -EWOULDBLOCK;
    }

        mBuffering = false;

        sp<AMessage> notify = dupNotify();
        notify->setInt32("what", kWhatBufferingEnd);
        notify->post();
    }

    sp<AnotherPacketSource> source = getSource(audio);

    if (source == NULL) {
@@ -246,11 +237,7 @@ status_t NuPlayer::RTSPSource::dequeueAccessUnit(
            if (!(otherSource != NULL && otherSource->isFinished(mediaDurationUs))) {
                // We should not enter buffering mode
                // if any of the sources already have detected EOS.
                mBuffering = true;

                sp<AMessage> notify = dupNotify();
                notify->setInt32("what", kWhatBufferingStart);
                notify->post();
                startBufferingIfNecessary();
            }

            return -EWOULDBLOCK;
@@ -630,7 +617,7 @@ void NuPlayer::RTSPSource::onSDPLoaded(const sp<AMessage> &msg) {
        }

        mState = DISCONNECTED;
        mFinalResult = err;
        setError(err);

        if (mDisconnectReplyID != 0) {
            finishDisconnectIfPossible();
@@ -657,7 +644,7 @@ void NuPlayer::RTSPSource::onDisconnected(const sp<AMessage> &msg) {
    }

    mState = DISCONNECTED;
    mFinalResult = err;
    setError(err);

    if (mDisconnectReplyID != 0) {
        finishDisconnectIfPossible();
@@ -678,4 +665,40 @@ void NuPlayer::RTSPSource::finishDisconnectIfPossible() {
    mDisconnectReplyID = 0;
}

void NuPlayer::RTSPSource::setError(status_t err) {
    Mutex::Autolock _l(mBufferingLock);
    mFinalResult = err;
}

void NuPlayer::RTSPSource::startBufferingIfNecessary() {
    Mutex::Autolock _l(mBufferingLock);

    if (!mBuffering) {
        mBuffering = true;

        sp<AMessage> notify = dupNotify();
        notify->setInt32("what", kWhatBufferingStart);
        notify->post();
    }
}

bool NuPlayer::RTSPSource::stopBufferingIfNecessary() {
    Mutex::Autolock _l(mBufferingLock);

    if (mBuffering) {
        if (!haveSufficientDataOnAllTracks()) {
            return false;
        }

        mBuffering = false;

        sp<AMessage> notify = dupNotify();
        notify->setInt32("what", kWhatBufferingEnd);
        notify->post();
    }

    return true;
}


}  // namespace android
+4 −0
Original line number Diff line number Diff line
@@ -97,6 +97,7 @@ private:
    State mState;
    status_t mFinalResult;
    uint32_t mDisconnectReplyID;
    Mutex mBufferingLock;
    bool mBuffering;

    sp<ALooper> mLooper;
@@ -126,6 +127,9 @@ private:
    bool haveSufficientDataOnAllTracks();

    void setEOSTimeout(bool audio, int64_t timeout);
    void setError(status_t err);
    void startBufferingIfNecessary();
    bool stopBufferingIfNecessary();

    DISALLOW_EVIL_CONSTRUCTORS(RTSPSource);
};
+108 −18
Original line number Diff line number Diff line
@@ -37,13 +37,26 @@ NuPlayer::StreamingSource::StreamingSource(
        const sp<IStreamSource> &source)
    : Source(notify),
      mSource(source),
      mFinalResult(OK) {
      mFinalResult(OK),
      mBuffering(false) {
}

NuPlayer::StreamingSource::~StreamingSource() {
    if (mLooper != NULL) {
        mLooper->unregisterHandler(id());
        mLooper->stop();
    }
}

void NuPlayer::StreamingSource::prepareAsync() {
    if (mLooper == NULL) {
        mLooper = new ALooper;
        mLooper->setName("streaming");
        mLooper->start();

        mLooper->registerHandler(this);
    }

    notifyVideoSizeChanged();
    notifyFlagsChanged(0);
    notifyPrepared();
@@ -62,13 +75,15 @@ void NuPlayer::StreamingSource::start() {
    mTSParser = new ATSParser(parserFlags);

    mStreamListener->start();

    postReadBuffer();
}

status_t NuPlayer::StreamingSource::feedMoreTSData() {
    if (mFinalResult != OK) {
        return mFinalResult;
    return postReadBuffer();
}

void NuPlayer::StreamingSource::onReadBuffer() {
    for (int32_t i = 0; i < 50; ++i) {
        char buffer[188];
        sp<AMessage> extra;
@@ -77,7 +92,7 @@ status_t NuPlayer::StreamingSource::feedMoreTSData() {
        if (n == 0) {
            ALOGI("input data EOS reached.");
            mTSParser->signalEOS(ERROR_END_OF_STREAM);
            mFinalResult = ERROR_END_OF_STREAM;
            setError(ERROR_END_OF_STREAM);
            break;
        } else if (n == INFO_DISCONTINUITY) {
            int32_t type = ATSParser::DISCONTINUITY_TIME;
@@ -88,7 +103,8 @@ status_t NuPlayer::StreamingSource::feedMoreTSData() {
                        IStreamListener::kKeyDiscontinuityMask, &mask)) {
                if (mask == 0) {
                    ALOGE("Client specified an illegal discontinuity type.");
                    return ERROR_UNSUPPORTED;
                    setError(ERROR_UNSUPPORTED);
                    break;
                }

                type = mask;
@@ -97,7 +113,6 @@ status_t NuPlayer::StreamingSource::feedMoreTSData() {
            mTSParser->signalDiscontinuity(
                    (ATSParser::DiscontinuityType)type, extra);
        } else if (n < 0) {
            CHECK_EQ(n, -EWOULDBLOCK);
            break;
        } else {
            if (buffer[0] == 0x00) {
@@ -128,26 +143,80 @@ status_t NuPlayer::StreamingSource::feedMoreTSData() {
                    ALOGE("TS Parser returned error %d", err);

                    mTSParser->signalEOS(err);
                    mFinalResult = err;
                    setError(err);
                    break;
                }
            }
        }
    }
}

status_t NuPlayer::StreamingSource::postReadBuffer() {
    {
        Mutex::Autolock _l(mBufferingLock);
        if (mFinalResult != OK) {
            return mFinalResult;
        }
        if (mBuffering) {
            return OK;
        }
        mBuffering = true;
    }

sp<MetaData> NuPlayer::StreamingSource::getFormatMeta(bool audio) {
    (new AMessage(kWhatReadBuffer, id()))->post();
    return OK;
}

bool NuPlayer::StreamingSource::haveSufficientDataOnAllTracks() {
    // We're going to buffer at least 2 secs worth data on all tracks before
    // starting playback (both at startup and after a seek).

    static const int64_t kMinDurationUs = 2000000ll;

    sp<AnotherPacketSource> audioTrack = getSource(true /*audio*/);
    sp<AnotherPacketSource> videoTrack = getSource(false /*audio*/);

    status_t err;
    int64_t durationUs;
    if (audioTrack != NULL
            && (durationUs = audioTrack->getBufferedDurationUs(&err))
                    < kMinDurationUs
            && err == OK) {
        ALOGV("audio track doesn't have enough data yet. (%.2f secs buffered)",
              durationUs / 1E6);
        return false;
    }

    if (videoTrack != NULL
            && (durationUs = videoTrack->getBufferedDurationUs(&err))
                    < kMinDurationUs
            && err == OK) {
        ALOGV("video track doesn't have enough data yet. (%.2f secs buffered)",
              durationUs / 1E6);
        return false;
    }

    return true;
}

void NuPlayer::StreamingSource::setError(status_t err) {
    Mutex::Autolock _l(mBufferingLock);
    mFinalResult = err;
}

sp<AnotherPacketSource> NuPlayer::StreamingSource::getSource(bool audio) {
    if (mTSParser == NULL) {
        return NULL;
    }

    ATSParser::SourceType type =
        audio ? ATSParser::AUDIO : ATSParser::VIDEO;
    sp<MediaSource> source = mTSParser->getSource(
            audio ? ATSParser::AUDIO : ATSParser::VIDEO);

    sp<AnotherPacketSource> source =
        static_cast<AnotherPacketSource *>(mTSParser->getSource(type).get());
    return static_cast<AnotherPacketSource *>(source.get());
}

sp<MetaData> NuPlayer::StreamingSource::getFormatMeta(bool audio) {
    sp<AnotherPacketSource> source = getSource(audio);

    if (source == NULL) {
        return NULL;
@@ -158,16 +227,16 @@ sp<MetaData> NuPlayer::StreamingSource::getFormatMeta(bool audio) {

status_t NuPlayer::StreamingSource::dequeueAccessUnit(
        bool audio, sp<ABuffer> *accessUnit) {
    ATSParser::SourceType type =
        audio ? ATSParser::AUDIO : ATSParser::VIDEO;

    sp<AnotherPacketSource> source =
        static_cast<AnotherPacketSource *>(mTSParser->getSource(type).get());
    sp<AnotherPacketSource> source = getSource(audio);

    if (source == NULL) {
        return -EWOULDBLOCK;
    }

    if (!haveSufficientDataOnAllTracks()) {
        postReadBuffer();
    }

    status_t finalResult;
    if (!source->hasBufferAvailable(&finalResult)) {
        return finalResult == OK ? -EWOULDBLOCK : finalResult;
@@ -190,5 +259,26 @@ bool NuPlayer::StreamingSource::isRealTime() const {
    return mSource->flags() & IStreamSource::kFlagIsRealTimeData;
}

void NuPlayer::StreamingSource::onMessageReceived(
        const sp<AMessage> &msg) {
    switch (msg->what()) {
        case kWhatReadBuffer:
        {
            onReadBuffer();

            {
                Mutex::Autolock _l(mBufferingLock);
                mBuffering = false;
            }
            break;
        }
        default:
        {
            TRESPASS();
        }
    }
}


}  // namespace android
+16 −0
Original line number Diff line number Diff line
@@ -25,6 +25,7 @@ namespace android {

struct ABuffer;
struct ATSParser;
struct AnotherPacketSource;

struct NuPlayer::StreamingSource : public NuPlayer::Source {
    StreamingSource(
@@ -43,14 +44,29 @@ struct NuPlayer::StreamingSource : public NuPlayer::Source {
protected:
    virtual ~StreamingSource();

    virtual void onMessageReceived(const sp<AMessage> &msg);

    virtual sp<MetaData> getFormatMeta(bool audio);

private:
    enum {
        kWhatReadBuffer,
    };
    sp<IStreamSource> mSource;
    status_t mFinalResult;
    sp<NuPlayerStreamListener> mStreamListener;
    sp<ATSParser> mTSParser;

    bool mBuffering;
    Mutex mBufferingLock;
    sp<ALooper> mLooper;

    void setError(status_t err);
    sp<AnotherPacketSource> getSource(bool audio);
    bool haveSufficientDataOnAllTracks();
    status_t postReadBuffer();
    void onReadBuffer();

    DISALLOW_EVIL_CONSTRUCTORS(StreamingSource);
};