Donate to e Foundation | Murena handsets with /e/OS | Own a part of Murena! Learn more

Commit 7a39abac authored by Robert Shih's avatar Robert Shih Committed by Android Git Automerger
Browse files

am 49b2e310: am a1f0c62c: am 43ca783e: httplive: block-by-block fetch, decrypt, and parse ts files.

* commit '49b2e310':
  httplive: block-by-block fetch, decrypt, and parse ts files.
parents 286c7d91 49b2e310
Loading
Loading
Loading
Loading
+6 −4
Original line number Diff line number Diff line
@@ -622,7 +622,7 @@ sp<PlaylistFetcher> LiveSession::addFetcher(const char *uri) {
 * - block_size == 0 means entire range
 *
 */
status_t LiveSession::fetchFile(
ssize_t LiveSession::fetchFile(
        const char *url, sp<ABuffer> *out,
        int64_t range_offset, int64_t range_length,
        uint32_t block_size, /* download block size */
@@ -673,6 +673,7 @@ status_t LiveSession::fetchFile(
        buffer->setRange(0, 0);
    }

    ssize_t bytesRead = 0;
    // adjust range_length if only reading partial block
    if (block_size > 0 && (range_length == -1 || buffer->size() + block_size < range_length)) {
        range_length = buffer->size() + block_size;
@@ -720,6 +721,7 @@ status_t LiveSession::fetchFile(
        }

        buffer->setRange(0, buffer->size() + (size_t)n);
        bytesRead += n;
    }

    *out = buffer;
@@ -730,7 +732,7 @@ status_t LiveSession::fetchFile(
        }
    }

    return OK;
    return bytesRead;
}

sp<M3UParser> LiveSession::fetchPlaylist(
@@ -741,9 +743,9 @@ sp<M3UParser> LiveSession::fetchPlaylist(

    sp<ABuffer> buffer;
    String8 actualUrl;
    status_t err = fetchFile(url, &buffer, 0, -1, 0, NULL, &actualUrl);
    ssize_t  err = fetchFile(url, &buffer, 0, -1, 0, NULL, &actualUrl);

    if (err != OK) {
    if (err <= 0) {
        return NULL;
    }

+1 −1
Original line number Diff line number Diff line
@@ -203,7 +203,7 @@ private:
    //
    // For reused HTTP sources, the caller must download a file sequentially without
    // any overlaps or gaps to prevent reconnection.
    status_t fetchFile(
    ssize_t fetchFile(
            const char *url, sp<ABuffer> *out,
            /* request/open a file starting at range_offset for range_length bytes */
            int64_t range_offset = 0, int64_t range_length = -1,
+270 −181
Original line number Diff line number Diff line
@@ -48,6 +48,7 @@ namespace android {
// static
const int64_t PlaylistFetcher::kMinBufferedDurationUs = 10000000ll;
const int64_t PlaylistFetcher::kMaxMonitorDelayUs = 3000000ll;
const int32_t PlaylistFetcher::kDownloadBlockSize = 192;
const int32_t PlaylistFetcher::kNumSkipFrames = 10;

PlaylistFetcher::PlaylistFetcher(
@@ -216,9 +217,9 @@ status_t PlaylistFetcher::decryptBuffer(
    if (index >= 0) {
        key = mAESKeyForURI.valueAt(index);
    } else {
        status_t err = mSession->fetchFile(keyURI.c_str(), &key);
        ssize_t err = mSession->fetchFile(keyURI.c_str(), &key);

        if (err != OK) {
        if (err < 0) {
            ALOGE("failed to fetch cipher key from '%s'.", keyURI.c_str());
            return ERROR_IO;
        } else if (key->size() != 16) {
@@ -704,6 +705,11 @@ status_t PlaylistFetcher::refreshPlaylist() {
    return OK;
}

// static
bool PlaylistFetcher::bufferStartsWithTsSyncByte(const sp<ABuffer>& buffer) {
    return buffer->size() > 0 && buffer->data()[0] == 0x47;
}

void PlaylistFetcher::onDownloadNext() {
    if (refreshPlaylist() != OK) {
        return;
@@ -824,11 +830,29 @@ void PlaylistFetcher::onDownloadNext() {

    ALOGV("fetching '%s'", uri.c_str());

    sp<ABuffer> buffer;
    status_t err = mSession->fetchFile(
            uri.c_str(), &buffer, range_offset, range_length);

    sp<DataSource> source;
    sp<ABuffer> buffer, tsBuffer;
    // decrypt a junk buffer to prefetch key; since a session uses only one http connection,
    // this avoids interleaved connections to the key and segment file.
    {
        sp<ABuffer> junk = new ABuffer(16);
        junk->setRange(0, 16);
        status_t err = decryptBuffer(mSeqNumber - firstSeqNumberInPlaylist, junk,
                true /* first */);
        if (err != OK) {
            notifyError(err);
            return;
        }
    }

    // block-wise download
    ssize_t bytesRead;
    do {
        bytesRead = mSession->fetchFile(
                uri.c_str(), &buffer, range_offset, range_length, kDownloadBlockSize, &source);

        if (bytesRead < 0) {
            status_t err = bytesRead;
            ALOGE("failed to fetch .ts segment at url '%s'", uri.c_str());
            notifyError(err);
            return;
@@ -836,10 +860,13 @@ void PlaylistFetcher::onDownloadNext() {

        CHECK(buffer != NULL);

    err = decryptBuffer(mSeqNumber - firstSeqNumberInPlaylist, buffer);
    if (err == OK) {
        err = checkDecryptPadding(buffer);
    }
        size_t size = buffer->size();
        // Set decryption range.
        buffer->setRange(size - bytesRead, bytesRead);
        status_t err = decryptBuffer(mSeqNumber - firstSeqNumberInPlaylist, buffer,
                buffer->offset() == 0 /* first */);
        // Unset decryption range.
        buffer->setRange(0, size);

        if (err != OK) {
            ALOGE("decryptBuffer failed w/ error %d", err);
@@ -870,7 +897,21 @@ void PlaylistFetcher::onDownloadNext() {
            }
        }

    err = extractAndQueueAccessUnits(buffer, itemMeta);
        err = OK;
        if (bufferStartsWithTsSyncByte(buffer)) {
            // Incremental extraction is only supported for MPEG2 transport streams.
            if (tsBuffer == NULL) {
                tsBuffer = new ABuffer(buffer->data(), buffer->capacity());
                tsBuffer->setRange(0, 0);
            } else if (tsBuffer->capacity() != buffer->capacity()) {
                size_t tsOff = tsBuffer->offset(), tsSize = tsBuffer->size();
                tsBuffer = new ABuffer(buffer->data(), buffer->capacity());
                tsBuffer->setRange(tsOff, tsSize);
            }
            tsBuffer->setRange(tsBuffer->offset(), tsBuffer->size() + bytesRead);

            err = extractAndQueueAccessUnitsFromTs(tsBuffer);
        }

        if (err == -EAGAIN) {
            // bad starting sequence number hint
@@ -889,11 +930,69 @@ void PlaylistFetcher::onDownloadNext() {
            return;
        }

        mStartup = false;
    } while (bytesRead != 0);

    if (bufferStartsWithTsSyncByte(buffer)) {
        // If we still don't see a stream after fetching a full ts segment mark it as
        // nonexistent.
        const size_t kNumTypes = ATSParser::NUM_SOURCE_TYPES;
        ATSParser::SourceType srcTypes[kNumTypes] =
                { ATSParser::VIDEO, ATSParser::AUDIO };
        LiveSession::StreamType streamTypes[kNumTypes] =
                { LiveSession::STREAMTYPE_VIDEO, LiveSession::STREAMTYPE_AUDIO };

        for (size_t i = 0; i < kNumTypes; i++) {
            ATSParser::SourceType srcType = srcTypes[i];
            LiveSession::StreamType streamType = streamTypes[i];

            sp<AnotherPacketSource> source =
                static_cast<AnotherPacketSource *>(
                    mTSParser->getSource(srcType).get());

            if (source == NULL) {
                ALOGW("MPEG2 Transport stream does not contain %s data.",
                      srcType == ATSParser::VIDEO ? "video" : "audio");

                mStreamTypeMask &= ~streamType;
                mPacketSources.removeItem(streamType);
            }
        }

    }

    if (checkDecryptPadding(buffer) != OK) {
        ALOGE("Incorrect padding bytes after decryption.");
        notifyError(ERROR_MALFORMED);
        return;
    }

    status_t err = OK;
    if (tsBuffer != NULL) {
        AString method;
        CHECK(buffer->meta()->findString("cipher-method", &method));
        if ((tsBuffer->size() > 0 && method == "NONE")
                || tsBuffer->size() > 16) {
            ALOGE("MPEG2 transport stream is not an even multiple of 188 "
                    "bytes in length.");
            notifyError(ERROR_MALFORMED);
            return;
        }
    }

    // bulk extract non-ts files
    if (tsBuffer == NULL) {
      err = extractAndQueueAccessUnits(buffer, itemMeta);
    }

    if (err != OK) {
        notifyError(err);
        return;
    }

    ++mSeqNumber;

    postMonitorQueue();

    mStartup = false;
}

int32_t PlaylistFetcher::getSeqNumberForTime(int64_t timeUs) const {
@@ -928,17 +1027,7 @@ int32_t PlaylistFetcher::getSeqNumberForTime(int64_t timeUs) const {
    return firstSeqNumberInPlaylist + index;
}

status_t PlaylistFetcher::extractAndQueueAccessUnits(
        const sp<ABuffer> &buffer, const sp<AMessage> &itemMeta) {
    if (buffer->size() > 0 && buffer->data()[0] == 0x47) {
        // Let's assume this is an MPEG2 transport stream.

        if ((buffer->size() % 188) != 0) {
            ALOGE("MPEG2 transport stream is not an even multiple of 188 "
                  "bytes in length.");
            return ERROR_MALFORMED;
        }

status_t PlaylistFetcher::extractAndQueueAccessUnitsFromTs(const sp<ABuffer> &buffer) {
    if (mTSParser == NULL) {
        // Use TS_TIMESTAMPS_ARE_ABSOLUTE so pts carry over between fetchers.
        mTSParser = new ATSParser(ATSParser::TS_TIMESTAMPS_ARE_ABSOLUTE);
@@ -957,7 +1046,7 @@ status_t PlaylistFetcher::extractAndQueueAccessUnits(
    }

    size_t offset = 0;
        while (offset < buffer->size()) {
    while (offset + 188 <= buffer->size()) {
        status_t err = mTSParser->feedTSPacket(buffer->data() + offset, 188);

        if (err != OK) {
@@ -966,6 +1055,8 @@ status_t PlaylistFetcher::extractAndQueueAccessUnits(

        offset += 188;
    }
    // setRange to indicate consumed bytes.
    buffer->setRange(buffer->offset() + offset, buffer->size() - offset);

    status_t err = OK;
    for (size_t i = mPacketSources.size(); i-- > 0;) {
@@ -975,7 +1066,6 @@ status_t PlaylistFetcher::extractAndQueueAccessUnits(
        ATSParser::SourceType type;
        const LiveSession::StreamType stream = mPacketSources.keyAt(i);
        switch (stream) {

            case LiveSession::STREAMTYPE_VIDEO:
                type = ATSParser::VIDEO;
                key = "timeUsVideo";
@@ -1002,11 +1092,6 @@ status_t PlaylistFetcher::extractAndQueueAccessUnits(
                    mTSParser->getSource(type).get());

        if (source == NULL) {
                ALOGW("MPEG2 Transport stream does not contain %s data.",
                      type == ATSParser::VIDEO ? "video" : "audio");

                mStreamTypeMask &= ~mPacketSources.keyAt(i);
                mPacketSources.removeItemsAt(i);
            continue;
        }

@@ -1069,7 +1154,7 @@ status_t PlaylistFetcher::extractAndQueueAccessUnits(
                accessUnit->meta()->setObject("format", format);
            }

                // Stash the sequence number so we can hint future fetchers where to start at.
            // Stash the sequence number so we can hint future playlist where to start at.
            accessUnit->meta()->setInt32("seq", mSeqNumber);
            packetSource->queueAccessUnit(accessUnit);
        }
@@ -1094,7 +1179,11 @@ status_t PlaylistFetcher::extractAndQueueAccessUnits(
    }

    return OK;
    } else if (buffer->size() >= 7 && !memcmp("WEBVTT\n", buffer->data(), 7)) {
}

status_t PlaylistFetcher::extractAndQueueAccessUnits(
        const sp<ABuffer> &buffer, const sp<AMessage> &itemMeta) {
    if (buffer->size() >= 7 && !memcmp("WEBVTT\n", buffer->data(), 7)) {
        if (mStreamTypeMask != LiveSession::STREAMTYPE_SUBTITLES) {
            ALOGE("This stream only contains subtitles.");
            return ERROR_MALFORMED;
+5 −0
Original line number Diff line number Diff line
@@ -87,8 +87,11 @@ private:

    static const int64_t kMinBufferedDurationUs;
    static const int64_t kMaxMonitorDelayUs;
    static const int32_t kDownloadBlockSize;
    static const int32_t kNumSkipFrames;

    static bool bufferStartsWithTsSyncByte(const sp<ABuffer>& buffer);

    // notifications to mSession
    sp<AMessage> mNotify;
    sp<AMessage> mStartTimeUsNotify;
@@ -169,6 +172,8 @@ private:
    // Resume a fetcher to continue until the stopping point stored in msg.
    status_t onResumeUntil(const sp<AMessage> &msg);

    status_t extractAndQueueAccessUnitsFromTs(const sp<ABuffer> &buffer);

    status_t extractAndQueueAccessUnits(
            const sp<ABuffer> &buffer, const sp<AMessage> &itemMeta);

+3 −2
Original line number Diff line number Diff line
@@ -71,8 +71,9 @@ struct ATSParser : public RefBase {
    void signalEOS(status_t finalResult);

    enum SourceType {
        VIDEO,
        AUDIO
        VIDEO = 0,
        AUDIO = 1,
        NUM_SOURCE_TYPES = 2
    };
    sp<MediaSource> getSource(SourceType type);