Donate to e Foundation | Murena handsets with /e/OS | Own a part of Murena! Learn more

Commit 3a177524 authored by Andreas Huber's avatar Andreas Huber
Browse files

Properly order the data written out to the transport stream by timestamp.

This ensures data locality across tracks.

Change-Id: I823b9407e3603473422177ec02dff4860ffc5b4f
parent 00502849
Loading
Loading
Loading
Loading
+140 −11
Original line number Diff line number Diff line
@@ -42,12 +42,21 @@ struct MPEG2TSWriter::SourceInfo : public AHandler {
    unsigned streamType() const;
    unsigned incrementContinuityCounter();

    void readMore();

    enum {
        kNotifyStartFailed,
        kNotifyBuffer,
        kNotifyReachedEOS,
    };

    sp<ABuffer> lastAccessUnit();
    int64_t lastAccessUnitTimeUs();
    void setLastAccessUnit(const sp<ABuffer> &accessUnit);

    void setEOSReceived();
    bool eosReceived() const;

protected:
    virtual void onMessageReceived(const sp<AMessage> &msg);

@@ -67,13 +76,16 @@ private:

    sp<ABuffer> mAACBuffer;

    sp<ABuffer> mLastAccessUnit;
    bool mEOSReceived;

    unsigned mStreamType;
    unsigned mContinuityCounter;

    void extractCodecSpecificData();

    void appendAACFrames(MediaBuffer *buffer);
    void flushAACFrames();
    bool appendAACFrames(MediaBuffer *buffer);
    bool flushAACFrames();

    void postAVCFrame(MediaBuffer *buffer);

@@ -83,6 +95,7 @@ private:
MPEG2TSWriter::SourceInfo::SourceInfo(const sp<MediaSource> &source)
    : mSource(source),
      mLooper(new ALooper),
      mEOSReceived(false),
      mStreamType(0),
      mContinuityCounter(0) {
    mLooper->setName("MPEG2TSWriter source");
@@ -232,6 +245,7 @@ void MPEG2TSWriter::SourceInfo::extractCodecSpecificData() {
    sp<AMessage> notify = mNotify->dup();
    notify->setInt32("what", kNotifyBuffer);
    notify->setObject("buffer", out);
    notify->setInt32("oob", true);
    notify->post();
}

@@ -260,11 +274,13 @@ void MPEG2TSWriter::SourceInfo::postAVCFrame(MediaBuffer *buffer) {
    notify->post();
}

void MPEG2TSWriter::SourceInfo::appendAACFrames(MediaBuffer *buffer) {
bool MPEG2TSWriter::SourceInfo::appendAACFrames(MediaBuffer *buffer) {
    bool accessUnitPosted = false;

    if (mAACBuffer != NULL
            && mAACBuffer->size() + 7 + buffer->range_length()
                    > mAACBuffer->capacity()) {
        flushAACFrames();
        accessUnitPosted = flushAACFrames();
    }

    if (mAACBuffer == NULL) {
@@ -324,11 +340,13 @@ void MPEG2TSWriter::SourceInfo::appendAACFrames(MediaBuffer *buffer) {
    ptr += buffer->range_length();

    mAACBuffer->setRange(0, ptr - mAACBuffer->data());

    return accessUnitPosted;
}

void MPEG2TSWriter::SourceInfo::flushAACFrames() {
bool MPEG2TSWriter::SourceInfo::flushAACFrames() {
    if (mAACBuffer == NULL) {
        return;
        return false;
    }

    sp<AMessage> notify = mNotify->dup();
@@ -337,6 +355,12 @@ void MPEG2TSWriter::SourceInfo::flushAACFrames() {
    notify->post();

    mAACBuffer.clear();

    return true;
}

void MPEG2TSWriter::SourceInfo::readMore() {
    (new AMessage(kWhatRead, id()))->post();
}

void MPEG2TSWriter::SourceInfo::onMessageReceived(const sp<AMessage> &msg) {
@@ -353,7 +377,7 @@ void MPEG2TSWriter::SourceInfo::onMessageReceived(const sp<AMessage> &msg) {

            extractCodecSpecificData();

            (new AMessage(kWhatRead, id()))->post();
            readMore();
            break;
        }

@@ -388,7 +412,9 @@ void MPEG2TSWriter::SourceInfo::onMessageReceived(const sp<AMessage> &msg) {
                           buffer->range_length());
                } else if (buffer->range_length() > 0) {
                    if (mStreamType == 0x0f) {
                        appendAACFrames(buffer);
                        if (!appendAACFrames(buffer)) {
                            msg->post();
                        }
                    } else {
                        postAVCFrame(buffer);
                    }
@@ -398,7 +424,7 @@ void MPEG2TSWriter::SourceInfo::onMessageReceived(const sp<AMessage> &msg) {
                buffer = NULL;
            }

            msg->post();
            // Do not read more data until told to.
            break;
        }

@@ -407,6 +433,35 @@ void MPEG2TSWriter::SourceInfo::onMessageReceived(const sp<AMessage> &msg) {
    }
}

sp<ABuffer> MPEG2TSWriter::SourceInfo::lastAccessUnit() {
    return mLastAccessUnit;
}

void MPEG2TSWriter::SourceInfo::setLastAccessUnit(
        const sp<ABuffer> &accessUnit) {
    mLastAccessUnit = accessUnit;
}

int64_t MPEG2TSWriter::SourceInfo::lastAccessUnitTimeUs() {
    if (mLastAccessUnit == NULL) {
        return -1;
    }

    int64_t timeUs;
    CHECK(mLastAccessUnit->meta()->findInt64("timeUs", &timeUs));

    return timeUs;
}

void MPEG2TSWriter::SourceInfo::setEOSReceived() {
    CHECK(!mEOSReceived);
    mEOSReceived = true;
}

bool MPEG2TSWriter::SourceInfo::eosReceived() const {
    return mEOSReceived;
}

////////////////////////////////////////////////////////////////////////////////

MPEG2TSWriter::MPEG2TSWriter(int fd)
@@ -527,15 +582,89 @@ void MPEG2TSWriter::onMessageReceived(const sp<AMessage> &msg) {

            if (what == SourceInfo::kNotifyReachedEOS
                    || what == SourceInfo::kNotifyStartFailed) {
                sp<SourceInfo> source = mSources.editItemAt(sourceIndex);
                source->setEOSReceived();

                sp<ABuffer> buffer = source->lastAccessUnit();
                source->setLastAccessUnit(NULL);

                if (buffer != NULL) {
                    writeTS();
                    writeAccessUnit(sourceIndex, buffer);
                }

                ++mNumSourcesDone;
            } else if (what == SourceInfo::kNotifyBuffer) {
                sp<RefBase> obj;
                CHECK(msg->findObject("buffer", &obj));

                writeTS();

                sp<ABuffer> buffer = static_cast<ABuffer *>(obj.get());

                int32_t oob;
                if (msg->findInt32("oob", &oob) && oob) {
                    // This is codec specific data delivered out of band.
                    // It can be written out immediately.
                    writeTS();
                    writeAccessUnit(sourceIndex, buffer);
                    break;
                }

                // We don't just write out data as we receive it from
                // the various sources. That would essentially write them
                // out in random order (as the thread scheduler determines
                // how the messages are dispatched).
                // Instead we gather an access unit for all tracks and
                // write out the one with the smallest timestamp, then
                // request more data for the written out track.
                // Rinse, repeat.
                // If we don't have data on any track we don't write
                // anything just yet.

                sp<SourceInfo> source = mSources.editItemAt(sourceIndex);

                CHECK(source->lastAccessUnit() == NULL);
                source->setLastAccessUnit(buffer);

                LOGV("lastAccessUnitTimeUs[%d] = %.2f secs",
                     sourceIndex, source->lastAccessUnitTimeUs() / 1E6);

                int64_t minTimeUs = -1;
                size_t minIndex = 0;

                for (size_t i = 0; i < mSources.size(); ++i) {
                    const sp<SourceInfo> &source = mSources.editItemAt(i);

                    if (source->eosReceived()) {
                        continue;
                    }

                    int64_t timeUs = source->lastAccessUnitTimeUs();
                    if (timeUs < 0) {
                        minTimeUs = -1;
                        break;
                    } else if (minTimeUs < 0 || timeUs < minTimeUs) {
                        minTimeUs = timeUs;
                        minIndex = i;
                    }
                }

                if (minTimeUs < 0) {
                    LOGV("not a all tracks have valid data.");
                    break;
                }

                LOGV("writing access unit at time %.2f secs (index %d)",
                     minTimeUs / 1E6, minIndex);

                source = mSources.editItemAt(minIndex);

                buffer = source->lastAccessUnit();
                source->setLastAccessUnit(NULL);

                writeTS();
                writeAccessUnit(minIndex, buffer);

                source->readMore();
            }
            break;
        }