Donate to e Foundation | Murena handsets with /e/OS | Own a part of Murena! Learn more

Commit 22408e85 authored by Amy Zhang's avatar Amy Zhang Committed by android-build-merger
Browse files

Merge changes from topic "pes"

am: 04587692

Change-Id: Idcec8afe3cb0a3d02ab4278862b1c623e1b6726b
parents 473f30d3 04587692
Loading
Loading
Loading
Loading
+93 −31
Original line number Diff line number Diff line
@@ -106,7 +106,7 @@ Return<void> Demux::addFilter(DemuxFilterType type, uint32_t bufferSize,
    } else {
        filterId = ++mLastUsedFilterId;

        mDemuxCallbacks.resize(filterId + 1);
        mFilterCallbacks.resize(filterId + 1);
        mFilterMQs.resize(filterId + 1);
        mFilterEvents.resize(filterId + 1);
        mFilterEventFlags.resize(filterId + 1);
@@ -114,6 +114,7 @@ Return<void> Demux::addFilter(DemuxFilterType type, uint32_t bufferSize,
        mFilterThreads.resize(filterId + 1);
        mFilterPids.resize(filterId + 1);
        mFilterOutputs.resize(filterId + 1);
        mFilterStatus.resize(filterId + 1);
    }

    mUsedFilterIds.insert(filterId);
@@ -125,7 +126,7 @@ Return<void> Demux::addFilter(DemuxFilterType type, uint32_t bufferSize,
    }

    // Add callback
    mDemuxCallbacks[filterId] = cb;
    mFilterCallbacks[filterId] = cb;

    // Mapping from the filter ID to the filter event
    DemuxFilterEvent event{
@@ -211,9 +212,16 @@ Return<Result> Demux::stopFilter(uint32_t filterId) {
    return Result::SUCCESS;
}

Return<Result> Demux::flushFilter(uint32_t /* filterId */) {
Return<Result> Demux::flushFilter(uint32_t filterId) {
    ALOGV("%s", __FUNCTION__);

    // temp implementation to flush the FMQ
    int size = mFilterMQs[filterId]->availableToRead();
    char* buffer = new char[size];
    mOutputMQ->read((unsigned char*)&buffer[0], size);
    delete[] buffer;
    mFilterStatus[filterId] = DemuxFilterStatus::DATA_READY;

    return Result::SUCCESS;
}

@@ -254,7 +262,7 @@ Return<Result> Demux::close() {
    mFilterThreads.clear();
    mUnusedFilterIds.clear();
    mUsedFilterIds.clear();
    mDemuxCallbacks.clear();
    mFilterCallbacks.clear();
    mFilterMQs.clear();
    mFilterEvents.clear();
    mFilterEventFlags.clear();
@@ -458,32 +466,54 @@ Result Demux::startSectionFilterHandler(uint32_t filterId) {

Result Demux::startPesFilterHandler(uint32_t filterId) {
    std::lock_guard<std::mutex> lock(mFilterEventLock);
    DemuxFilterPesEvent pesEvent;
    if (mFilterOutputs[filterId].empty()) {
        return Result::SUCCESS;
    }

    for (int i = 0; i < mFilterOutputs[filterId].size(); i += 188) {
        uint8_t pusi = mFilterOutputs[filterId][i + 1] & 0x40;
        uint8_t adaptFieldControl = (mFilterOutputs[filterId][i + 3] & 0x30) >> 4;
        ALOGD("[Demux] pusi %d, adaptFieldControl %d", pusi, adaptFieldControl);
        if (pusi && (adaptFieldControl == 0x01)) {
        if (mPesSizeLeft == 0) {
            uint32_t prefix = (mFilterOutputs[filterId][i + 4] << 16) |
                              (mFilterOutputs[filterId][i + 5] << 8) |
                              mFilterOutputs[filterId][i + 6];
            ALOGD("[Demux] prefix %d", prefix);
            if (prefix == 0x000001) {
                // TODO handle mulptiple Pes filters
                mPesSizeLeft =
                        (mFilterOutputs[filterId][i + 7] << 8) | mFilterOutputs[filterId][i + 8];
                ALOGD("[Demux] pes data length %d", mPesSizeLeft);
            } else {
                continue;
            }
        }

        int endPoint = min(184, mPesSizeLeft);
        // append data and check size
        vector<uint8_t>::const_iterator first = mFilterOutputs[filterId].begin() + i + 4;
            vector<uint8_t>::const_iterator last = mFilterOutputs[filterId].begin() + i + 187;
            vector<uint8_t> filterOutData(first, last);
            if (!writeDataToFilterMQ(filterOutData, filterId)) {
        vector<uint8_t>::const_iterator last = mFilterOutputs[filterId].begin() + i + 3 + endPoint;
        mPesOutput.insert(mPesOutput.end(), first, last);
        // size does not match then continue
        mPesSizeLeft -= endPoint;
        if (mPesSizeLeft > 0) {
            continue;
        }
        // size match then create event
        if (!writeDataToFilterMQ(mPesOutput, filterId)) {
            mFilterOutputs[filterId].clear();
            return Result::INVALID_STATE;
        }
        maySendFilterStatusCallback(filterId);
        DemuxFilterPesEvent pesEvent;
        pesEvent = {
                // temp dump meta data
                    .streamId = filterOutData[3],
                    .dataLength = static_cast<uint16_t>(filterOutData.size()),
                .streamId = mPesOutput[3],
                .dataLength = static_cast<uint16_t>(mPesOutput.size()),
        };
        ALOGD("[Demux] assembled pes data length %d", pesEvent.dataLength);

        int size = mFilterEvents[filterId].events.size();
        mFilterEvents[filterId].events.resize(size + 1);
        mFilterEvents[filterId].events[size].pes(pesEvent);
        }
        mPesOutput.clear();
    }

    mFilterOutputs[filterId].clear();
@@ -672,8 +702,10 @@ void Demux::filterThreadLoop(uint32_t filterId) {
            continue;
        }
        // After successfully write, send a callback and wait for the read to be done
        mDemuxCallbacks[filterId]->onFilterEvent(mFilterEvents[filterId]);
        mFilterCallbacks[filterId]->onFilterEvent(mFilterEvents[filterId]);
        mFilterEvents[filterId].events.resize(0);
        mFilterStatus[filterId] = DemuxFilterStatus::DATA_READY;
        mFilterCallbacks[filterId]->onFilterStatus(filterId, mFilterStatus[filterId]);
        break;
    }

@@ -693,18 +725,20 @@ void Demux::filterThreadLoop(uint32_t filterId) {
                break;
            }

            if (mDemuxCallbacks[filterId] == nullptr) {
            if (mFilterCallbacks[filterId] == nullptr) {
                ALOGD("[Demux] filter %d does not hava callback. Ending thread", filterId);
                break;
            }

            maySendFilterStatusCallback(filterId);

            while (mFilterThreadRunning[filterId]) {
                std::lock_guard<std::mutex> lock(mFilterEventLock);
                if (mFilterEvents[filterId].events.size() == 0) {
                    continue;
                }
                // After successfully write, send a callback and wait for the read to be done
                mDemuxCallbacks[filterId]->onFilterEvent(mFilterEvents[filterId]);
                mFilterCallbacks[filterId]->onFilterEvent(mFilterEvents[filterId]);
                mFilterEvents[filterId].events.resize(0);
                break;
            }
@@ -755,7 +789,7 @@ void Demux::maySendInputStatusCallback() {
    int availableToWrite = mInputMQ->availableToWrite();

    DemuxInputStatus newStatus =
            checkStatusChange(availableToWrite, availableToRead, mInputSettings.highThreshold,
            checkInputStatusChange(availableToWrite, availableToRead, mInputSettings.highThreshold,
                                   mInputSettings.lowThreshold);
    if (mIntputStatus != newStatus) {
        mInputCallback->onInputStatus(newStatus);
@@ -763,7 +797,22 @@ void Demux::maySendInputStatusCallback() {
    }
}

DemuxInputStatus Demux::checkStatusChange(uint32_t availableToWrite, uint32_t availableToRead,
void Demux::maySendFilterStatusCallback(uint32_t filterId) {
    std::lock_guard<std::mutex> lock(mFilterStatusLock);
    int availableToRead = mFilterMQs[filterId]->availableToRead();
    int availableToWrite = mInputMQ->availableToWrite();
    int fmqSize = mFilterMQs[filterId]->getQuantumCount();

    DemuxFilterStatus newStatus =
            checkFilterStatusChange(filterId, availableToWrite, availableToRead,
                                    ceil(fmqSize * 0.75), ceil(fmqSize * 0.25));
    if (mFilterStatus[filterId] != newStatus) {
        mFilterCallbacks[filterId]->onFilterStatus(filterId, newStatus);
        mFilterStatus[filterId] = newStatus;
    }
}

DemuxInputStatus Demux::checkInputStatusChange(uint32_t availableToWrite, uint32_t availableToRead,
                                               uint32_t highThreshold, uint32_t lowThreshold) {
    if (availableToWrite == 0) {
        return DemuxInputStatus::SPACE_FULL;
@@ -777,6 +826,19 @@ DemuxInputStatus Demux::checkStatusChange(uint32_t availableToWrite, uint32_t av
    return mIntputStatus;
}

DemuxFilterStatus Demux::checkFilterStatusChange(uint32_t filterId, uint32_t availableToWrite,
                                                 uint32_t availableToRead, uint32_t highThreshold,
                                                 uint32_t lowThreshold) {
    if (availableToWrite == 0) {
        return DemuxFilterStatus::OVERFLOW;
    } else if (availableToRead > highThreshold) {
        return DemuxFilterStatus::HIGH_WATER;
    } else if (availableToRead < lowThreshold) {
        return DemuxFilterStatus::LOW_WATER;
    }
    return mFilterStatus[filterId];
}

Result Demux::startBroadcastInputLoop() {
    pthread_create(&mBroadcastInputThread, NULL, __threadLoopBroadcast, this);
    pthread_setname_np(mBroadcastInputThread, "broadcast_input_thread");
@@ -818,7 +880,7 @@ void Demux::broadcastInputThreadLoop() {
                }
                // filter and dispatch filter output
                vector<uint8_t> byteBuffer;
                byteBuffer.resize(sizeof(buffer));
                byteBuffer.resize(packetSize);
                for (int index = 0; index < byteBuffer.size(); index++) {
                    byteBuffer[index] = static_cast<uint8_t>(buffer[index]);
                }
+15 −3
Original line number Diff line number Diff line
@@ -19,6 +19,7 @@

#include <android/hardware/tv/tuner/1.0/IDemux.h>
#include <fmq/MessageQueue.h>
#include <math.h>
#include <set>
#include "Frontend.h"
#include "Tuner.h"
@@ -153,8 +154,12 @@ class Demux : public IDemux {
    bool readDataFromMQ();
    bool writeSectionsAndCreateEvent(uint32_t filterId, vector<uint8_t> data);
    void maySendInputStatusCallback();
    DemuxInputStatus checkStatusChange(uint32_t availableToWrite, uint32_t availableToRead,
    void maySendFilterStatusCallback(uint32_t filterId);
    DemuxInputStatus checkInputStatusChange(uint32_t availableToWrite, uint32_t availableToRead,
                                            uint32_t highThreshold, uint32_t lowThreshold);
    DemuxFilterStatus checkFilterStatusChange(uint32_t filterId, uint32_t availableToWrite,
                                              uint32_t availableToRead, uint32_t highThreshold,
                                              uint32_t lowThreshold);
    /**
     * A dispatcher to read and dispatch input data to all the started filters.
     * Each filter handler handles the data filtering/output writing/filterEvent updating.
@@ -203,7 +208,7 @@ class Demux : public IDemux {
    /**
     * Demux callbacks used on filter events or IO buffer status
     */
    vector<sp<IDemuxCallback>> mDemuxCallbacks;
    vector<sp<IDemuxCallback>> mFilterCallbacks;
    sp<IDemuxCallback> mInputCallback;
    sp<IDemuxCallback> mOutputCallback;
    bool mInputConfigured = false;
@@ -219,6 +224,7 @@ class Demux : public IDemux {

    // FMQ status local records
    DemuxInputStatus mIntputStatus;
    vector<DemuxFilterStatus> mFilterStatus;
    /**
     * If a specific filter's writing loop is still running
     */
@@ -239,6 +245,7 @@ class Demux : public IDemux {
     * Lock to protect writes to the input status
     */
    std::mutex mInputStatusLock;
    std::mutex mFilterStatusLock;
    std::mutex mBroadcastInputThreadLock;
    std::mutex mFilterThreadLock;
    std::mutex mInputThreadLock;
@@ -247,6 +254,11 @@ class Demux : public IDemux {
     * TODO make this dynamic/random/can take as a parameter
     */
    const uint16_t SECTION_WRITE_COUNT = 10;

    // temp handle single PES filter
    // TODO handle mulptiple Pes filters
    int mPesSizeLeft = 0;
    vector<uint8_t> mPesOutput;
};

}  // namespace implementation