Donate to e Foundation | Murena handsets with /e/OS | Own a part of Murena! Learn more

Commit c13371c6 authored by Amy's avatar Amy
Browse files

Adding a TS filter functionality into the Demux default impl

Test: atest
Bug: 135709325
Change-Id: I149104fd4c7d1ce413036b147365a49973455e72
(cherry picked from commit 42a5b4b8)
parent 292d7ecc
Loading
Loading
Loading
Loading
+124 −52
Original line number Original line Diff line number Diff line
@@ -100,6 +100,8 @@ Return<void> Demux::addFilter(DemuxFilterType type, uint32_t bufferSize,
        mFilterEventFlags.resize(filterId + 1);
        mFilterEventFlags.resize(filterId + 1);
        mFilterThreadRunning.resize(filterId + 1);
        mFilterThreadRunning.resize(filterId + 1);
        mFilterThreads.resize(filterId + 1);
        mFilterThreads.resize(filterId + 1);
        mFilterPids.resize(filterId + 1);
        mFilterOutputs.resize(filterId + 1);
    }
    }


    mUsedFilterIds.insert(filterId);
    mUsedFilterIds.insert(filterId);
@@ -142,52 +144,56 @@ Return<void> Demux::getFilterQueueDesc(uint32_t filterId, getFilterQueueDesc_cb
    return Void();
    return Void();
}
}


Return<Result> Demux::configureFilter(uint32_t /* filterId */,
Return<Result> Demux::configureFilter(uint32_t filterId, const DemuxFilterSettings& settings) {
                                      const DemuxFilterSettings& /* settings */) {
    ALOGV("%s", __FUNCTION__);
    ALOGV("%s", __FUNCTION__);


    return Result::SUCCESS;
}

Return<Result> Demux::startFilter(uint32_t filterId) {
    ALOGV("%s", __FUNCTION__);
    Result result;

    if (mUsedFilterIds.find(filterId) == mUsedFilterIds.end()) {
        ALOGW("No filter with id: %d exists to start filter", filterId);
        return Result::INVALID_ARGUMENT;
    }

    switch (mFilterEvents[filterId].filterType) {
    switch (mFilterEvents[filterId].filterType) {
        case DemuxFilterType::SECTION:
        case DemuxFilterType::SECTION:
            result = startFilterLoop(filterId);
            mFilterPids[filterId] = settings.section().tpid;
            break;
            break;
        case DemuxFilterType::PES:
        case DemuxFilterType::PES:
            result = startPesFilterHandler(filterId);
            mFilterPids[filterId] = settings.pesData().tpid;
            break;
            break;
        case DemuxFilterType::TS:
        case DemuxFilterType::TS:
            result = startTsFilterHandler();
            mFilterPids[filterId] = settings.ts().tpid;
            return Result::SUCCESS;
            break;
        case DemuxFilterType::AUDIO:
        case DemuxFilterType::AUDIO:
            mFilterPids[filterId] = settings.audio().tpid;
            break;
        case DemuxFilterType::VIDEO:
        case DemuxFilterType::VIDEO:
            result = startMediaFilterHandler(filterId);
            mFilterPids[filterId] = settings.video().tpid;
            break;
            break;
        case DemuxFilterType::RECORD:
        case DemuxFilterType::RECORD:
            result = startRecordFilterHandler(filterId);
            mFilterPids[filterId] = settings.record().tpid;
            break;
            break;
        case DemuxFilterType::PCR:
        case DemuxFilterType::PCR:
            result = startPcrFilterHandler();
            mFilterPids[filterId] = settings.pcr().tpid;
            return Result::SUCCESS;
            break;
        default:
        default:
            return Result::UNKNOWN_ERROR;
            return Result::UNKNOWN_ERROR;
    }
    }
    return Result::SUCCESS;
}

Return<Result> Demux::startFilter(uint32_t filterId) {
    ALOGV("%s", __FUNCTION__);
    Result result;

    if (mUsedFilterIds.find(filterId) == mUsedFilterIds.end()) {
        ALOGW("No filter with id: %d exists to start filter", filterId);
        return Result::INVALID_ARGUMENT;
    }

    result = startFilterLoop(filterId);


    return result;
    return result;
}
}


Return<Result> Demux::stopFilter(uint32_t /* filterId */) {
Return<Result> Demux::stopFilter(uint32_t filterId) {
    ALOGV("%s", __FUNCTION__);
    ALOGV("%s", __FUNCTION__);


    mFilterThreadRunning[filterId] = false;

    return Result::SUCCESS;
    return Result::SUCCESS;
}
}


@@ -238,6 +244,8 @@ Return<Result> Demux::close() {
    mFilterMQs.clear();
    mFilterMQs.clear();
    mFilterEvents.clear();
    mFilterEvents.clear();
    mFilterEventFlags.clear();
    mFilterEventFlags.clear();
    mFilterOutputs.clear();
    mFilterPids.clear();
    mLastUsedFilterId = -1;
    mLastUsedFilterId = -1;


    return Result::SUCCESS;
    return Result::SUCCESS;
@@ -277,19 +285,21 @@ Return<void> Demux::getOutputQueueDesc(getOutputQueueDesc_cb _hidl_cb) {
    return Void();
    return Void();
}
}


Return<Result> Demux::configureOutput(const DemuxOutputSettings& /* settings */) {
Return<Result> Demux::configureOutput(const DemuxOutputSettings& settings) {
    ALOGV("%s", __FUNCTION__);
    ALOGV("%s", __FUNCTION__);


    mOutputConfigured = true;
    mOutputSettings = settings;
    return Result::SUCCESS;
    return Result::SUCCESS;
}
}


Return<Result> Demux::attachOutputTsFilter(uint32_t /*filterId*/) {
Return<Result> Demux::attachOutputFilter(uint32_t /*filterId*/) {
    ALOGV("%s", __FUNCTION__);
    ALOGV("%s", __FUNCTION__);


    return Result::SUCCESS;
    return Result::SUCCESS;
}
}


Return<Result> Demux::detachOutputTsFilter(uint32_t /* filterId */) {
Return<Result> Demux::detachOutputFilter(uint32_t /* filterId */) {
    ALOGV("%s", __FUNCTION__);
    ALOGV("%s", __FUNCTION__);


    return Result::SUCCESS;
    return Result::SUCCESS;
@@ -353,15 +363,26 @@ Return<void> Demux::getInputQueueDesc(getInputQueueDesc_cb _hidl_cb) {
    return Void();
    return Void();
}
}


Return<Result> Demux::configureInput(const DemuxInputSettings& /* settings */) {
Return<Result> Demux::configureInput(const DemuxInputSettings& settings) {
    ALOGV("%s", __FUNCTION__);
    ALOGV("%s", __FUNCTION__);


    mInputConfigured = true;
    mInputSettings = settings;

    return Result::SUCCESS;
    return Result::SUCCESS;
}
}


Return<Result> Demux::startInput() {
Return<Result> Demux::startInput() {
    ALOGV("%s", __FUNCTION__);
    ALOGV("%s", __FUNCTION__);


    if (!mInputCallback) {
        return Result::NOT_INITIALIZED;
    }

    if (!mInputConfigured) {
        return Result::INVALID_STATE;
    }

    pthread_create(&mInputThread, NULL, __threadLoopInput, this);
    pthread_create(&mInputThread, NULL, __threadLoopInput, this);
    pthread_setname_np(mInputThread, "demux_input_waiting_loop");
    pthread_setname_np(mInputThread, "demux_input_waiting_loop");


@@ -373,6 +394,8 @@ Return<Result> Demux::startInput() {
Return<Result> Demux::stopInput() {
Return<Result> Demux::stopInput() {
    ALOGV("%s", __FUNCTION__);
    ALOGV("%s", __FUNCTION__);


    mInputThreadRunning = false;

    return Result::SUCCESS;
    return Result::SUCCESS;
}
}


@@ -403,36 +426,43 @@ Result Demux::startFilterLoop(uint32_t filterId) {
    return Result::SUCCESS;
    return Result::SUCCESS;
}
}


Result Demux::startSectionFilterHandler(uint32_t filterId, vector<uint8_t> data) {
Result Demux::startSectionFilterHandler(uint32_t filterId) {
    if (!writeSectionsAndCreateEvent(filterId, data)) {
    if (mFilterOutputs[filterId].empty()) {
        return Result::SUCCESS;
    }
    if (!writeSectionsAndCreateEvent(filterId, mFilterOutputs[filterId])) {
        ALOGD("[Demux] filter %d fails to write into FMQ. Ending thread", filterId);
        ALOGD("[Demux] filter %d fails to write into FMQ. Ending thread", filterId);
        return Result::UNKNOWN_ERROR;
        return Result::UNKNOWN_ERROR;
    }
    }


    mFilterOutputs[filterId].clear();

    return Result::SUCCESS;
    return Result::SUCCESS;
}
}


Result Demux::startPesFilterHandler(uint32_t filterId) {
Result Demux::startPesFilterHandler(uint32_t filterId) {
    // TODO generate multiple events in one event callback
    std::lock_guard<std::mutex> lock(mFilterEventLock);
    DemuxFilterPesEvent pesEvent;
    DemuxFilterPesEvent pesEvent;
    if (mFilterOutputs[filterId].empty()) {
        return Result::SUCCESS;
    }

    // TODO extract PES from TS
    if (!writeDataToFilterMQ(mFilterOutputs[filterId], filterId)) {
        mFilterOutputs[filterId].clear();
        return Result::INVALID_STATE;
    }
    pesEvent = {
    pesEvent = {
            // temp dump meta data
            // temp dump meta data
            .streamId = 0,
            .streamId = 0,
            .dataLength = 530,
            .dataLength = static_cast<uint16_t>(mFilterOutputs[filterId].size()),
    };
    };
    mFilterEvents[filterId].events.resize(1);
    int size = mFilterEvents[filterId].events.size();
    mFilterEvents[filterId].events[0].pes(pesEvent);
    mFilterEvents[filterId].events.resize(size + 1);
    /*pthread_create(&mThreadId, NULL, __threadLoop, this);
    mFilterEvents[filterId].events[size].pes(pesEvent);
    pthread_setname_np(mThreadId, "demux_section_filter_waiting_loop");*/
    if (!writeDataToFilterMQ(fakeDataInputBuffer, filterId)) {
        return Result::INVALID_STATE;
    }


    if (mDemuxCallbacks[filterId] == nullptr) {
    mFilterOutputs[filterId].clear();
        return Result::NOT_INITIALIZED;
    }


    mDemuxCallbacks[filterId]->onFilterEvent(mFilterEvents[filterId]);
    return Result::SUCCESS;
    return Result::SUCCESS;
}
}


@@ -499,18 +529,18 @@ bool Demux::createFilterMQ(uint32_t bufferSize, uint32_t filterId) {
bool Demux::writeSectionsAndCreateEvent(uint32_t filterId, vector<uint8_t> data) {
bool Demux::writeSectionsAndCreateEvent(uint32_t filterId, vector<uint8_t> data) {
    // TODO check how many sections has been read
    // TODO check how many sections has been read
    std::lock_guard<std::mutex> lock(mFilterEventLock);
    std::lock_guard<std::mutex> lock(mFilterEventLock);
    int size = mFilterEvents[filterId].events.size();
    mFilterEvents[filterId].events.resize(size + 1);
    if (!writeDataToFilterMQ(data, filterId)) {
    if (!writeDataToFilterMQ(data, filterId)) {
        return false;
        return false;
    }
    }
    int size = mFilterEvents[filterId].events.size();
    mFilterEvents[filterId].events.resize(size + 1);
    DemuxFilterSectionEvent secEvent;
    DemuxFilterSectionEvent secEvent;
    secEvent = {
    secEvent = {
            // temp dump meta data
            // temp dump meta data
            .tableId = 0,
            .tableId = 0,
            .version = 1,
            .version = 1,
            .sectionNum = 1,
            .sectionNum = 1,
            .dataLength = 530,
            .dataLength = static_cast<uint16_t>(data.size()),
    };
    };
    mFilterEvents[filterId].events[size].section(secEvent);
    mFilterEvents[filterId].events[size].section(secEvent);
    return true;
    return true;
@@ -525,20 +555,32 @@ bool Demux::writeDataToFilterMQ(const std::vector<uint8_t>& data, uint32_t filte
}
}


bool Demux::filterAndOutputData() {
bool Demux::filterAndOutputData() {
    ALOGD("[Demux] start to dispatch data to filters");
    Result result;
    set<uint32_t>::iterator it;

    // Read input data from the input FMQ
    // Read input data from the input FMQ
    int size = mInputMQ->availableToRead();
    int size = mInputMQ->availableToRead();
    int inputPacketSize = mInputSettings.packetSize;
    vector<uint8_t> dataOutputBuffer;
    vector<uint8_t> dataOutputBuffer;
    dataOutputBuffer.resize(size);
    dataOutputBuffer.resize(inputPacketSize);
    mInputMQ->read(dataOutputBuffer.data(), size);


    Result result;
    // Dispatch the packet to the PID matching filter output buffer
    // Filter the data and feed the output to each filter
    for (int i = 0; i < size / inputPacketSize; i++) {
    set<uint32_t>::iterator it;
        mInputMQ->read(dataOutputBuffer.data(), inputPacketSize);
        for (it = mUsedFilterIds.begin(); it != mUsedFilterIds.end(); it++) {
            uint16_t pid = ((dataOutputBuffer[1] & 0x1f) << 8) | ((dataOutputBuffer[2] & 0xff));
            if (pid == mFilterPids[*it]) {
                mFilterOutputs[*it].insert(mFilterOutputs[*it].end(), dataOutputBuffer.begin(),
                                           dataOutputBuffer.end());
            }
        }
    }

    // Handle the output data per filter type
    for (it = mUsedFilterIds.begin(); it != mUsedFilterIds.end(); it++) {
    for (it = mUsedFilterIds.begin(); it != mUsedFilterIds.end(); it++) {
        switch (mFilterEvents[*it].filterType) {
        switch (mFilterEvents[*it].filterType) {
            case DemuxFilterType::SECTION:
            case DemuxFilterType::SECTION:
                result = startSectionFilterHandler(*it, dataOutputBuffer);
                result = startSectionFilterHandler(*it);
                break;
                break;
            case DemuxFilterType::PES:
            case DemuxFilterType::PES:
                result = startPesFilterHandler(*it);
                result = startPesFilterHandler(*it);
@@ -657,12 +699,42 @@ void Demux::inputThreadLoop() {
            ALOGD("[Demux] input data failed to be filtered. Ending thread");
            ALOGD("[Demux] input data failed to be filtered. Ending thread");
            break;
            break;
        }
        }

        maySendInputStatusCallback();
    }
    }


    mInputThreadRunning = false;
    mInputThreadRunning = false;
    ALOGD("[Demux] input thread ended.");
    ALOGD("[Demux] input thread ended.");
}
}


void Demux::maySendInputStatusCallback() {
    std::lock_guard<std::mutex> lock(mInputStatusLock);
    int availableToRead = mInputMQ->availableToRead();
    int availableToWrite = mInputMQ->availableToWrite();

    DemuxInputStatus newStatus =
            checkStatusChange(availableToWrite, availableToRead, mInputSettings.highThreshold,
                              mInputSettings.lowThreshold);
    if (mIntputStatus != newStatus) {
        mInputCallback->onInputStatus(newStatus);
        mIntputStatus = newStatus;
    }
}

DemuxInputStatus Demux::checkStatusChange(uint32_t availableToWrite, uint32_t availableToRead,
                                          uint32_t highThreshold, uint32_t lowThreshold) {
    if (availableToWrite == 0) {
        return DemuxInputStatus::SPACE_FULL;
    } else if (availableToRead > highThreshold) {
        return DemuxInputStatus::SPACE_ALMOST_FULL;
    } else if (availableToRead < lowThreshold) {
        return DemuxInputStatus::SPACE_ALMOST_EMPTY;
    } else if (availableToRead == 0) {
        return DemuxInputStatus::SPACE_EMPTY;
    }
    return mIntputStatus;
}

}  // namespace implementation
}  // namespace implementation
}  // namespace V1_0
}  // namespace V1_0
}  // namespace tuner
}  // namespace tuner
+21 −3
Original line number Original line Diff line number Diff line
@@ -91,9 +91,9 @@ class Demux : public IDemux {


    virtual Return<Result> configureOutput(const DemuxOutputSettings& settings) override;
    virtual Return<Result> configureOutput(const DemuxOutputSettings& settings) override;


    virtual Return<Result> attachOutputTsFilter(uint32_t filterId) override;
    virtual Return<Result> attachOutputFilter(uint32_t filterId) override;


    virtual Return<Result> detachOutputTsFilter(uint32_t filterId) override;
    virtual Return<Result> detachOutputFilter(uint32_t filterId) override;


    virtual Return<Result> startOutput() override;
    virtual Return<Result> startOutput() override;


@@ -115,7 +115,7 @@ class Demux : public IDemux {
     * They are also responsible to write the filtered output into the filter FMQ
     * They are also responsible to write the filtered output into the filter FMQ
     * and update the filterEvent bound with the same filterId.
     * and update the filterEvent bound with the same filterId.
     */
     */
    Result startSectionFilterHandler(uint32_t filterId, vector<uint8_t> data);
    Result startSectionFilterHandler(uint32_t filterId);
    Result startPesFilterHandler(uint32_t filterId);
    Result startPesFilterHandler(uint32_t filterId);
    Result startTsFilterHandler();
    Result startTsFilterHandler();
    Result startMediaFilterHandler(uint32_t filterId);
    Result startMediaFilterHandler(uint32_t filterId);
@@ -136,6 +136,9 @@ class Demux : public IDemux {
    bool writeDataToFilterMQ(const std::vector<uint8_t>& data, uint32_t filterId);
    bool writeDataToFilterMQ(const std::vector<uint8_t>& data, uint32_t filterId);
    bool readDataFromMQ();
    bool readDataFromMQ();
    bool writeSectionsAndCreateEvent(uint32_t filterId, vector<uint8_t> data);
    bool writeSectionsAndCreateEvent(uint32_t filterId, vector<uint8_t> data);
    void maySendInputStatusCallback();
    DemuxInputStatus checkStatusChange(uint32_t availableToWrite, uint32_t availableToRead,
                                       uint32_t highThreshold, uint32_t lowThreshold);
    /**
    /**
     * A dispatcher to read and dispatch input data to all the started filters.
     * A dispatcher to read and dispatch input data to all the started filters.
     * Each filter handler handles the data filtering/output writing/filterEvent updating.
     * Each filter handler handles the data filtering/output writing/filterEvent updating.
@@ -169,6 +172,8 @@ class Demux : public IDemux {
     * A list of created FilterMQ ptrs.
     * A list of created FilterMQ ptrs.
     * The array number is the filter ID.
     * The array number is the filter ID.
     */
     */
    vector<uint16_t> mFilterPids;
    vector<vector<uint8_t>> mFilterOutputs;
    vector<unique_ptr<FilterMQ>> mFilterMQs;
    vector<unique_ptr<FilterMQ>> mFilterMQs;
    vector<EventFlag*> mFilterEventFlags;
    vector<EventFlag*> mFilterEventFlags;
    vector<DemuxFilterEvent> mFilterEvents;
    vector<DemuxFilterEvent> mFilterEvents;
@@ -182,10 +187,18 @@ class Demux : public IDemux {
    vector<sp<IDemuxCallback>> mDemuxCallbacks;
    vector<sp<IDemuxCallback>> mDemuxCallbacks;
    sp<IDemuxCallback> mInputCallback;
    sp<IDemuxCallback> mInputCallback;
    sp<IDemuxCallback> mOutputCallback;
    sp<IDemuxCallback> mOutputCallback;
    bool mInputConfigured = false;
    bool mOutputConfigured = false;
    DemuxInputSettings mInputSettings;
    DemuxOutputSettings mOutputSettings;

    // Thread handlers
    // Thread handlers
    pthread_t mInputThread;
    pthread_t mInputThread;
    pthread_t mOutputThread;
    pthread_t mOutputThread;
    vector<pthread_t> mFilterThreads;
    vector<pthread_t> mFilterThreads;

    // FMQ status local records
    DemuxInputStatus mIntputStatus;
    /**
    /**
     * If a specific filter's writing loop is still running
     * If a specific filter's writing loop is still running
     */
     */
@@ -198,7 +211,12 @@ class Demux : public IDemux {
    /**
    /**
     * Lock to protect writes to the filter event
     * Lock to protect writes to the filter event
     */
     */
    // TODO make each filter separate event lock
    std::mutex mFilterEventLock;
    std::mutex mFilterEventLock;
    /**
     * Lock to protect writes to the input status
     */
    std::mutex mInputStatusLock;
    /**
    /**
     * How many times a filter should write
     * How many times a filter should write
     * TODO make this dynamic/random/can take as a parameter
     * TODO make this dynamic/random/can take as a parameter
+1 −7
Original line number Original line Diff line number Diff line
@@ -105,13 +105,7 @@ Return<Result> Frontend::setLna(bool /* bEnable */) {
    return Result::SUCCESS;
    return Result::SUCCESS;
}
}


Return<Result> Frontend::setLnb(const sp<ILnb>& /* lnb */) {
Return<Result> Frontend::setLnb(uint32_t /* lnb */) {
    ALOGV("%s", __FUNCTION__);

    return Result::SUCCESS;
}

Return<Result> Frontend::sendDiseqcMessage(const hidl_vec<uint8_t>& /* diseqcMessage */) {
    ALOGV("%s", __FUNCTION__);
    ALOGV("%s", __FUNCTION__);


    return Result::SUCCESS;
    return Result::SUCCESS;
+1 −3
Original line number Original line Diff line number Diff line
@@ -56,11 +56,9 @@ class Frontend : public IFrontend {
    virtual Return<void> getStatus(const hidl_vec<FrontendStatusType>& statusTypes,
    virtual Return<void> getStatus(const hidl_vec<FrontendStatusType>& statusTypes,
                                   getStatus_cb _hidl_cb) override;
                                   getStatus_cb _hidl_cb) override;


    virtual Return<Result> sendDiseqcMessage(const hidl_vec<uint8_t>& diseqcMessage) override;

    virtual Return<Result> setLna(bool bEnable) override;
    virtual Return<Result> setLna(bool bEnable) override;


    virtual Return<Result> setLnb(const sp<ILnb>& lnb) override;
    virtual Return<Result> setLnb(uint32_t lnb) override;


    FrontendType getFrontendType();
    FrontendType getFrontendType();


+6 −0
Original line number Original line Diff line number Diff line
@@ -48,6 +48,12 @@ Return<Result> Lnb::setSatellitePosition(FrontendLnbPosition /* position */) {
    return Result::SUCCESS;
    return Result::SUCCESS;
}
}


Return<Result> Lnb::sendDiseqcMessage(const hidl_vec<uint8_t>& /* diseqcMessage */) {
    ALOGV("%s", __FUNCTION__);

    return Result::SUCCESS;
}

Return<Result> Lnb::close() {
Return<Result> Lnb::close() {
    ALOGV("%s", __FUNCTION__);
    ALOGV("%s", __FUNCTION__);


Loading