Donate to e Foundation | Murena handsets with /e/OS | Own a part of Murena! Learn more

Commit 43e08e01 authored by Lajos Molnar's avatar Lajos Molnar Committed by Android (Google) Code Review
Browse files

Merge "stagefright: rework MediaCodecSource" into nyc-dev

parents 0d2ef4b9 6a3a56fb
Loading
Loading
Loading
Loading
+12 −6
Original line number Diff line number Diff line
@@ -19,6 +19,7 @@

#include <media/stagefright/foundation/ABase.h>
#include <media/stagefright/foundation/AHandlerReflector.h>
#include <media/stagefright/foundation/Mutexed.h>
#include <media/stagefright/MediaSource.h>

#include <gui/IGraphicBufferConsumer.h>
@@ -79,6 +80,7 @@ private:
        kWhatStop,
        kWhatPause,
        kWhatSetInputBufferTimeOffset,
        kWhatStopStalled,
    };

    MediaCodecSource(
@@ -127,12 +129,16 @@ private:
    int64_t mFirstSampleTimeUs;
    List<int64_t> mDriftTimeQueue;

    // following variables are protected by mOutputBufferLock
    Mutex mOutputBufferLock;
    Condition mOutputBufferCond;
    List<MediaBuffer*> mOutputBufferQueue;
    struct Output {
        Output();
        List<MediaBuffer*> mBufferQueue;
        bool mEncoderReachedEOS;
        status_t mErrorCode;
        Condition mCond;
    };
    Mutexed<Output> mOutput;

    int32_t mGeneration;

    DISALLOW_EVIL_CONSTRUCTORS(MediaCodecSource);
};
+186 −142
Original line number Diff line number Diff line
@@ -43,15 +43,19 @@ namespace android {
const int kDefaultSwVideoEncoderFormat = HAL_PIXEL_FORMAT_YCbCr_420_888;
const int kDefaultSwVideoEncoderDataSpace = HAL_DATASPACE_BT709;

const int kStopTimeoutUs = 300000; // allow 1 sec for shutting down encoder

struct MediaCodecSource::Puller : public AHandler {
    Puller(const sp<MediaSource> &source);

    status_t start(const sp<MetaData> &meta, const sp<AMessage> &notify);
    void stop();

    void stopSource();
    void pause();
    void resume();

    bool readBuffer(MediaBuffer **buffer);

protected:
    virtual void onMessageReceived(const sp<AMessage> &msg);
    virtual ~Puller();
@@ -61,17 +65,31 @@ private:
        kWhatStart = 'msta',
        kWhatStop,
        kWhatPull,
        kWhatPause,
        kWhatResume,
    };

    sp<MediaSource> mSource;
    sp<AMessage> mNotify;
    sp<ALooper> mLooper;
    int32_t mPullGeneration;
    bool mIsAudio;

    struct Queue {
        Queue()
            : mReadPendingSince(0),
              mPaused(false),
              mPulling(false) { }
        int64_t mReadPendingSince;
        bool mPaused;
    bool mReachedEOS;
        bool mPulling;
        Vector<MediaBuffer *> mReadBuffers;

        void flush();
        // if queue is empty, return false and set *|buffer| to NULL . Otherwise, pop
        // buffer from front of the queue, place it into *|buffer| and return true. 
        bool readBuffer(MediaBuffer **buffer);
        // add a buffer to the back of the queue 
        void pushBuffer(MediaBuffer *mbuf);
    };
    Mutexed<Queue> mQueue;

    status_t postSynchronouslyAndReturnError(const sp<AMessage> &msg);
    void schedulePull();
@@ -83,10 +101,8 @@ private:
MediaCodecSource::Puller::Puller(const sp<MediaSource> &source)
    : mSource(source),
      mLooper(new ALooper()),
      mPullGeneration(0),
      mIsAudio(false),
      mPaused(false),
      mReachedEOS(false) {
      mIsAudio(false)
{
    sp<MetaData> meta = source->getFormat();
    const char *mime;
    CHECK(meta->findCString(kKeyMIMEType, &mime));
@@ -101,6 +117,33 @@ MediaCodecSource::Puller::~Puller() {
    mLooper->stop();
}

void MediaCodecSource::Puller::Queue::pushBuffer(MediaBuffer *mbuf) {
    mReadBuffers.push_back(mbuf);
}

bool MediaCodecSource::Puller::Queue::readBuffer(MediaBuffer **mbuf) {
    if (mReadBuffers.empty()) {
        *mbuf = NULL;
        return false;
    }
    *mbuf = *mReadBuffers.begin();
    mReadBuffers.erase(mReadBuffers.begin());
    return true;
}

void MediaCodecSource::Puller::Queue::flush() {
    MediaBuffer *mbuf;
    while (readBuffer(&mbuf)) {
        // there are no null buffers in the queue
        mbuf->release();
    }
}

bool MediaCodecSource::Puller::readBuffer(MediaBuffer **mbuf) {
    Mutexed<Queue>::Locked queue(mQueue);
    return queue->readBuffer(mbuf);
}

status_t MediaCodecSource::Puller::postSynchronouslyAndReturnError(
        const sp<AMessage> &msg) {
    sp<AMessage> response;
@@ -117,8 +160,7 @@ status_t MediaCodecSource::Puller::postSynchronouslyAndReturnError(
    return err;
}

status_t MediaCodecSource::Puller::start(const sp<MetaData> &meta,
        const sp<AMessage> &notify) {
status_t MediaCodecSource::Puller::start(const sp<MetaData> &meta, const sp<AMessage> &notify) {
    ALOGV("puller (%s) start", mIsAudio ? "audio" : "video");
    mLooper->start(
            false /* runOnCallingThread */,
@@ -133,41 +175,46 @@ status_t MediaCodecSource::Puller::start(const sp<MetaData> &meta,
}

void MediaCodecSource::Puller::stop() {
    // Stop source from caller's thread instead of puller's looper.
    // mSource->stop() is thread-safe, doing it outside the puller's
    // looper allows us to at least stop if source gets stuck.
    // If source gets stuck in read(), the looper would never
    // be able to process the stop(), which could lead to ANR.
    bool interrupt = false;
    {
        // mark stopping before actually reaching kWhatStop on the looper, so the pulling will
        // stop.
        Mutexed<Queue>::Locked queue(mQueue);
        queue->mPulling = false;
        interrupt = queue->mReadPendingSince && (queue->mReadPendingSince < ALooper::GetNowUs() - 1000000);
        queue->flush(); // flush any unprocessed pulled buffers
    }

    ALOGV("source (%s) stopping", mIsAudio ? "audio" : "video");
    if (interrupt) {
        // call source->stop if read has been pending for over a second
        // TODO: we should really call this if kWhatStop has not returned for more than a second.
        mSource->stop();
    ALOGV("source (%s) stopped", mIsAudio ? "audio" : "video");
    }
}

void MediaCodecSource::Puller::stopSource() {
    (new AMessage(kWhatStop, this))->post();
}

void MediaCodecSource::Puller::pause() {
    (new AMessage(kWhatPause, this))->post();
    Mutexed<Queue>::Locked queue(mQueue);
    queue->mPaused = true;
}

void MediaCodecSource::Puller::resume() {
    (new AMessage(kWhatResume, this))->post();
    Mutexed<Queue>::Locked queue(mQueue);
    queue->mPaused = false;
}

void MediaCodecSource::Puller::schedulePull() {
    sp<AMessage> msg = new AMessage(kWhatPull, this);
    msg->setInt32("generation", mPullGeneration);
    msg->post();
    (new AMessage(kWhatPull, this))->post();
}

void MediaCodecSource::Puller::handleEOS() {
    if (!mReachedEOS) {
    ALOGV("puller (%s) posting EOS", mIsAudio ? "audio" : "video");
        mReachedEOS = true;
        sp<AMessage> notify = mNotify->dup();
        notify->setPointer("accessUnit", NULL);
        notify->post();
    }
    sp<AMessage> msg = mNotify->dup();
    msg->setInt32("eos", 1);
    msg->post();
}

void MediaCodecSource::Puller::onMessageReceived(const sp<AMessage> &msg) {
@@ -177,7 +224,10 @@ void MediaCodecSource::Puller::onMessageReceived(const sp<AMessage> &msg) {
            sp<RefBase> obj;
            CHECK(msg->findObject("meta", &obj));

            mReachedEOS = false;
            {
                Mutexed<Queue>::Locked queue(mQueue);
                queue->mPulling = true;
            }

            status_t err = mSource->start(static_cast<MetaData *>(obj.get()));

@@ -196,61 +246,52 @@ void MediaCodecSource::Puller::onMessageReceived(const sp<AMessage> &msg) {

        case kWhatStop:
        {
            ++mPullGeneration;

            handleEOS();
            mSource->stop();
            break;
        }

        case kWhatPull:
        {
            int32_t generation;
            CHECK(msg->findInt32("generation", &generation));

            if (generation != mPullGeneration) {
            Mutexed<Queue>::Locked queue(mQueue);
            queue->mReadPendingSince = ALooper::GetNowUs();
            if (!queue->mPulling) {
                handleEOS();
                break;
            }

            MediaBuffer *mbuf;
            queue.unlock();
            MediaBuffer *mbuf = NULL;
            status_t err = mSource->read(&mbuf);
            queue.lock();

            if (mPaused) {
                if (err == OK) {
            queue->mReadPendingSince = 0;
            // if we need to discard buffer
            if (!queue->mPulling || queue->mPaused || err != OK) {
                if (mbuf != NULL) {
                    mbuf->release();
                    mbuf = NULL;
                }

                msg->post();
                break;
            }

            if (err != OK) {
                if (err == ERROR_END_OF_STREAM) {
                if (queue->mPulling && err == OK) {
                    msg->post(); // if simply paused, keep pulling source
                } else if (err == ERROR_END_OF_STREAM) {
                    ALOGV("stream ended, mbuf %p", mbuf);
                } else {
                } else if (err != OK) {
                    ALOGE("error %d reading stream.", err);
                }
                handleEOS();
            } else {
                sp<AMessage> notify = mNotify->dup();

                notify->setPointer("accessUnit", mbuf);
                notify->post();

                msg->post();
            }
            break;
            }

        case kWhatPause:
        {
            mPaused = true;
            break;
            if (mbuf != NULL) {
                queue->pushBuffer(mbuf);
            }

        case kWhatResume:
        {
            mPaused = false;
            queue.unlock();

            if (mbuf != NULL) {
                mNotify->post();
                msg->post();
            } else {
                handleEOS();
            }
            break;
        }

@@ -259,6 +300,11 @@ void MediaCodecSource::Puller::onMessageReceived(const sp<AMessage> &msg) {
    }
}

MediaCodecSource::Output::Output()
    : mEncoderReachedEOS(false),
      mErrorCode(OK) {
}

// static
sp<MediaCodecSource> MediaCodecSource::Create(
        const sp<ALooper> &looper,
@@ -289,21 +335,7 @@ status_t MediaCodecSource::start(MetaData* params) {

status_t MediaCodecSource::stop() {
    sp<AMessage> msg = new AMessage(kWhatStop, mReflector);
    status_t err = postSynchronouslyAndReturnError(msg);

    // mPuller->stop() needs to be done outside MediaCodecSource's looper,
    // as it contains a synchronous call to stop the underlying MediaSource,
    // which often waits for all outstanding MediaBuffers to return, but
    // MediaBuffers are only returned when MediaCodecSource looper gets
    // to process them.

    if (mPuller != NULL) {
        ALOGI("puller (%s) stopping", mIsVideo ? "video" : "audio");
        mPuller->stop();
        ALOGI("puller (%s) stopped", mIsVideo ? "video" : "audio");
    }

    return err;
    return postSynchronouslyAndReturnError(msg);
}

status_t MediaCodecSource::pause() {
@@ -318,18 +350,18 @@ sp<IGraphicBufferProducer> MediaCodecSource::getGraphicBufferProducer() {

status_t MediaCodecSource::read(
        MediaBuffer** buffer, const ReadOptions* /* options */) {
    Mutex::Autolock autolock(mOutputBufferLock);
    Mutexed<Output>::Locked output(mOutput);

    *buffer = NULL;
    while (mOutputBufferQueue.size() == 0 && !mEncoderReachedEOS) {
        mOutputBufferCond.wait(mOutputBufferLock);
    while (output->mBufferQueue.size() == 0 && !output->mEncoderReachedEOS) {
        output.waitForCondition(output->mCond);
    }
    if (!mEncoderReachedEOS) {
        *buffer = *mOutputBufferQueue.begin();
        mOutputBufferQueue.erase(mOutputBufferQueue.begin());
    if (!output->mEncoderReachedEOS) {
        *buffer = *output->mBufferQueue.begin();
        output->mBufferQueue.erase(output->mBufferQueue.begin());
        return OK;
    }
    return mErrorCode;
    return output->mErrorCode;
}

void MediaCodecSource::signalBufferReturned(MediaBuffer *buffer) {
@@ -357,8 +389,7 @@ MediaCodecSource::MediaCodecSource(
      mGraphicBufferConsumer(consumer),
      mInputBufferTimeOffsetUs(0),
      mFirstSampleTimeUs(-1ll),
      mEncoderReachedEOS(false),
      mErrorCode(OK) {
      mGeneration(0) {
    CHECK(mLooper != NULL);

    AString mime;
@@ -485,8 +516,11 @@ status_t MediaCodecSource::initEncoder() {
        return err;
    }

    mEncoderReachedEOS = false;
    mErrorCode = OK;
    {
        Mutexed<Output>::Locked output(mOutput);
        output->mEncoderReachedEOS = false;
        output->mErrorCode = OK;
    }

    return OK;
}
@@ -498,14 +532,6 @@ void MediaCodecSource::releaseEncoder() {

    mEncoder->release();
    mEncoder.clear();

    while (!mInputBufferQueue.empty()) {
        MediaBuffer *mbuf = *mInputBufferQueue.begin();
        mInputBufferQueue.erase(mInputBufferQueue.begin());
        if (mbuf != NULL) {
            mbuf->release();
        }
    }
}

status_t MediaCodecSource::postSynchronouslyAndReturnError(
@@ -525,25 +551,32 @@ status_t MediaCodecSource::postSynchronouslyAndReturnError(
}

void MediaCodecSource::signalEOS(status_t err) {
    if (!mEncoderReachedEOS) {
        ALOGV("encoder (%s) reached EOS", mIsVideo ? "video" : "audio");
    bool reachedEOS = false;
    {
            Mutex::Autolock autoLock(mOutputBufferLock);
        Mutexed<Output>::Locked output(mOutput);
        reachedEOS = output->mEncoderReachedEOS;
        if (!reachedEOS) {
            ALOGV("encoder (%s) reached EOS", mIsVideo ? "video" : "audio");
            // release all unread media buffers
            for (List<MediaBuffer*>::iterator it = mOutputBufferQueue.begin();
                    it != mOutputBufferQueue.end(); it++) {
            for (List<MediaBuffer*>::iterator it = output->mBufferQueue.begin();
                    it != output->mBufferQueue.end(); it++) {
                (*it)->release();
            }
            mOutputBufferQueue.clear();
            mEncoderReachedEOS = true;
            mErrorCode = err;
            mOutputBufferCond.signal();
        }
            output->mBufferQueue.clear();
            output->mEncoderReachedEOS = true;
            output->mErrorCode = err;
            output->mCond.signal();

            reachedEOS = true;
            output.unlock();
            releaseEncoder();
        }
    if (mStopping && mEncoderReachedEOS) {
    }

    if (mStopping && reachedEOS) {
        ALOGI("encoder (%s) stopped", mIsVideo ? "video" : "audio");
        mPuller->stopSource();
        ALOGV("source (%s) stopped", mIsVideo ? "video" : "audio");
        // posting reply to everyone that's waiting
        List<sp<AReplyToken>>::iterator it;
        for (it = mStopReplyIDQueue.begin();
@@ -552,6 +585,7 @@ void MediaCodecSource::signalEOS(status_t err) {
        }
        mStopReplyIDQueue.clear();
        mStopping = false;
        ++mGeneration;
    }
}

@@ -577,11 +611,8 @@ void MediaCodecSource::resume(int64_t skipFramesBeforeUs) {
}

status_t MediaCodecSource::feedEncoderInputBuffers() {
    while (!mInputBufferQueue.empty()
            && !mAvailEncoderInputIndices.empty()) {
        MediaBuffer* mbuf = *mInputBufferQueue.begin();
        mInputBufferQueue.erase(mInputBufferQueue.begin());

    MediaBuffer* mbuf = NULL;
    while (!mAvailEncoderInputIndices.empty() && mPuller->readBuffer(&mbuf)) {
        size_t bufferIndex = *mAvailEncoderInputIndices.begin();
        mAvailEncoderInputIndices.erase(mAvailEncoderInputIndices.begin());

@@ -700,30 +731,19 @@ void MediaCodecSource::onMessageReceived(const sp<AMessage> &msg) {
    switch (msg->what()) {
    case kWhatPullerNotify:
    {
        MediaBuffer *mbuf;
        CHECK(msg->findPointer("accessUnit", (void**)&mbuf));

        if (mbuf == NULL) {
            ALOGV("puller (%s) reached EOS",
                    mIsVideo ? "video" : "audio");
        int32_t eos = 0;
        if (msg->findInt32("eos", &eos) && eos) {
            ALOGV("puller (%s) reached EOS", mIsVideo ? "video" : "audio");
            signalEOS();
            break;
        }

        if (mEncoder == NULL) {
            ALOGV("got msg '%s' after encoder shutdown.",
                  msg->debugString().c_str());

            if (mbuf != NULL) {
                mbuf->release();
            }

            ALOGV("got msg '%s' after encoder shutdown.", msg->debugString().c_str());
            break;
        }

        mInputBufferQueue.push_back(mbuf);

        feedEncoderInputBuffers();

        break;
    }
    case kWhatEncoderActivity:
@@ -815,9 +835,9 @@ void MediaCodecSource::onMessageReceived(const sp<AMessage> &msg) {
            mbuf->add_ref();

            {
                Mutex::Autolock autoLock(mOutputBufferLock);
                mOutputBufferQueue.push_back(mbuf);
                mOutputBufferCond.signal();
                Mutexed<Output>::Locked output(mOutput);
                output->mBufferQueue.push_back(mbuf);
                output->mCond.signal();
            }

            mEncoder->releaseOutputBuffer(index);
@@ -851,7 +871,7 @@ void MediaCodecSource::onMessageReceived(const sp<AMessage> &msg) {
        sp<AReplyToken> replyID;
        CHECK(msg->senderAwaitsResponse(&replyID));

        if (mEncoderReachedEOS) {
        if (mOutput.lock()->mEncoderReachedEOS) {
            // if we already reached EOS, reply and return now
            ALOGI("encoder (%s) already stopped",
                    mIsVideo ? "video" : "audio");
@@ -869,14 +889,38 @@ void MediaCodecSource::onMessageReceived(const sp<AMessage> &msg) {
        mStopping = true;

        // if using surface, signal source EOS and wait for EOS to come back.
        // otherwise, release encoder and post EOS if haven't done already
        // otherwise, stop puller (which also clears the input buffer queue)
        // and wait for the EOS message. We cannot call source->stop() because
        // the encoder may still be processing input buffers.
        if (mFlags & FLAG_USE_SURFACE_INPUT) {
            mEncoder->signalEndOfInputStream();
        } else {
            signalEOS();
            mPuller->stop();
        }

        // complete stop even if encoder/puller stalled
        sp<AMessage> timeoutMsg = new AMessage(kWhatStopStalled, mReflector);
        timeoutMsg->setInt32("generation", mGeneration);
        timeoutMsg->post(kStopTimeoutUs);
        break;
    }

    case kWhatStopStalled:
    {
        int32_t generation;
        CHECK(msg->findInt32("generation", &generation));
        if (generation != mGeneration) {
             break;
        }

        if (!(mFlags & FLAG_USE_SURFACE_INPUT)) {
            ALOGV("source (%s) stopping", mIsVideo ? "video" : "audio");
            mPuller->stopSource();
            ALOGV("source (%s) stopped", mIsVideo ? "video" : "audio");
        }
        signalEOS();
    }

    case kWhatPause:
    {
        if (mFlags & FLAG_USE_SURFACE_INPUT) {