Donate to e Foundation | Murena handsets with /e/OS | Own a part of Murena! Learn more

Commit e223baab authored by Amy's avatar Amy
Browse files

Tuner HAL Demux Playback interface implementation

Test: manual
Bug: 135709325
Change-Id: I0b673159b667c5bde47e9ed285cfa1bdc6c668c6
parent 00c63bb5
Loading
Loading
Loading
Loading
+383 −117
Original line number Diff line number Diff line
@@ -73,34 +73,6 @@ Demux::Demux(uint32_t demuxId) {

Demux::~Demux() {}

bool Demux::createAndSaveMQ(uint32_t bufferSize, uint32_t filterId) {
    ALOGV("%s", __FUNCTION__);

    // Create a synchronized FMQ that supports blocking read/write
    std::unique_ptr<FilterMQ> tmpFilterMQ =
            std::unique_ptr<FilterMQ>(new (std::nothrow) FilterMQ(bufferSize, true));
    if (!tmpFilterMQ->isValid()) {
        ALOGW("Failed to create FMQ of filter with id: %d", filterId);
        return false;
    }

    mFilterMQs.resize(filterId + 1);
    mFilterMQs[filterId] = std::move(tmpFilterMQ);

    EventFlag* mFilterEventFlag;
    if (EventFlag::createEventFlag(mFilterMQs[filterId]->getEventFlagWord(), &mFilterEventFlag) !=
        OK) {
        return false;
    }
    mFilterEventFlags.resize(filterId + 1);
    mFilterEventFlags[filterId] = mFilterEventFlag;
    mFilterWriteCount.resize(filterId + 1);
    mFilterWriteCount[filterId] = 0;
    mThreadRunning.resize(filterId + 1);

    return true;
}

Return<Result> Demux::setFrontendDataSource(uint32_t frontendId) {
    ALOGV("%s", __FUNCTION__);

@@ -113,23 +85,42 @@ Return<void> Demux::addFilter(DemuxFilterType type, uint32_t bufferSize,
                              const sp<IDemuxCallback>& cb, addFilter_cb _hidl_cb) {
    ALOGV("%s", __FUNCTION__);

    uint32_t filterId = mLastUsedFilterId + 1;
    mLastUsedFilterId += 1;
    uint32_t filterId;

    if (!mUnusedFilterIds.empty()) {
        filterId = *mUnusedFilterIds.begin();

        mUnusedFilterIds.erase(filterId);
    } else {
        filterId = ++mLastUsedFilterId;

        mDemuxCallbacks.resize(filterId + 1);
        mFilterMQs.resize(filterId + 1);
        mFilterEvents.resize(filterId + 1);
        mFilterEventFlags.resize(filterId + 1);
        mFilterThreadRunning.resize(filterId + 1);
        mFilterThreads.resize(filterId + 1);
    }

    mUsedFilterIds.insert(filterId);

    if ((type != DemuxFilterType::PCR || type != DemuxFilterType::TS) && cb == nullptr) {
        ALOGW("callback can't be null");
        _hidl_cb(Result::INVALID_ARGUMENT, filterId);
        return Void();
    }

    // Add callback
    mDemuxCallbacks.resize(filterId + 1);
    mDemuxCallbacks[filterId] = cb;

    // Mapping from the filter ID to the filter type
    mFilterTypes.resize(filterId + 1);
    mFilterTypes[filterId] = type;
    // Mapping from the filter ID to the filter event
    DemuxFilterEvent event{
            .filterId = filterId,
            .filterType = type,
    };
    mFilterEvents[filterId] = event;

    if (!createAndSaveMQ(bufferSize, filterId)) {
    if (!createFilterMQ(bufferSize, filterId)) {
        _hidl_cb(Result::UNKNOWN_ERROR, -1);
        return Void();
    }
@@ -141,8 +132,8 @@ Return<void> Demux::addFilter(DemuxFilterType type, uint32_t bufferSize,
Return<void> Demux::getFilterQueueDesc(uint32_t filterId, getFilterQueueDesc_cb _hidl_cb) {
    ALOGV("%s", __FUNCTION__);

    if (filterId < 0 || filterId > mLastUsedFilterId) {
        ALOGW("No filter with id: %d exists", filterId);
    if (mUsedFilterIds.find(filterId) == mUsedFilterIds.end()) {
        ALOGW("No filter with id: %d exists to get desc", filterId);
        _hidl_cb(Result::INVALID_ARGUMENT, FilterMQ::Descriptor());
        return Void();
    }
@@ -160,35 +151,29 @@ Return<Result> Demux::configureFilter(uint32_t /* filterId */,

Return<Result> Demux::startFilter(uint32_t filterId) {
    ALOGV("%s", __FUNCTION__);
    Result result;

    if (filterId < 0 || filterId > mLastUsedFilterId) {
        ALOGW("No filter with id: %d exists", filterId);
    if (mUsedFilterIds.find(filterId) == mUsedFilterIds.end()) {
        ALOGW("No filter with id: %d exists to start filter", filterId);
        return Result::INVALID_ARGUMENT;
    }

    DemuxFilterType filterType = mFilterTypes[filterId];
    Result result;
    DemuxFilterEvent event{
            .filterId = filterId,
            .filterType = filterType,
    };

    switch (filterType) {
    switch (mFilterEvents[filterId].filterType) {
        case DemuxFilterType::SECTION:
            result = startSectionFilterHandler(event);
            result = startFilterLoop(filterId);
            break;
        case DemuxFilterType::PES:
            result = startPesFilterHandler(event);
            result = startPesFilterHandler(filterId);
            break;
        case DemuxFilterType::TS:
            result = startTsFilterHandler();
            return Result::SUCCESS;
        case DemuxFilterType::AUDIO:
        case DemuxFilterType::VIDEO:
            result = startMediaFilterHandler(event);
            result = startMediaFilterHandler(filterId);
            break;
        case DemuxFilterType::RECORD:
            result = startRecordFilterHandler(event);
            result = startRecordFilterHandler(filterId);
            break;
        case DemuxFilterType::PCR:
            result = startPcrFilterHandler();
@@ -212,9 +197,13 @@ Return<Result> Demux::flushFilter(uint32_t /* filterId */) {
    return Result::SUCCESS;
}

Return<Result> Demux::removeFilter(uint32_t /* filterId */) {
Return<Result> Demux::removeFilter(uint32_t filterId) {
    ALOGV("%s", __FUNCTION__);

    // resetFilterRecords(filterId);
    mUsedFilterIds.erase(filterId);
    mUnusedFilterIds.insert(filterId);

    return Result::SUCCESS;
}

@@ -239,48 +228,191 @@ Return<void> Demux::getAvSyncTime(AvSyncHwId /* avSyncHwId */, getAvSyncTime_cb
Return<Result> Demux::close() {
    ALOGV("%s", __FUNCTION__);

    set<uint32_t>::iterator it;
    mInputThread = 0;
    mOutputThread = 0;
    mFilterThreads.clear();
    mUnusedFilterIds.clear();
    mUsedFilterIds.clear();
    mDemuxCallbacks.clear();
    mFilterMQs.clear();
    mFilterEvents.clear();
    mFilterEventFlags.clear();
    mLastUsedFilterId = -1;

    return Result::SUCCESS;
}

bool Demux::writeSectionsAndCreateEvent(DemuxFilterEvent& event, uint32_t sectionNum) {
    event.events.resize(sectionNum);
    for (int i = 0; i < sectionNum; i++) {
        DemuxFilterSectionEvent secEvent;
        secEvent = {
                // temp dump meta data
                .tableId = 0,
                .version = 1,
                .sectionNum = 1,
                .dataLength = 530,
        };
        event.events[i].section(secEvent);
        if (!writeDataToFilterMQ(fakeDataInputBuffer, event.filterId)) {
            return false;
Return<Result> Demux::addOutput(uint32_t bufferSize, const sp<IDemuxCallback>& cb) {
    ALOGV("%s", __FUNCTION__);

    // Create a synchronized FMQ that supports blocking read/write
    std::unique_ptr<FilterMQ> tmpFilterMQ =
            std::unique_ptr<FilterMQ>(new (std::nothrow) FilterMQ(bufferSize, true));
    if (!tmpFilterMQ->isValid()) {
        ALOGW("Failed to create output FMQ");
        return Result::UNKNOWN_ERROR;
    }

    mOutputMQ = std::move(tmpFilterMQ);

    if (EventFlag::createEventFlag(mOutputMQ->getEventFlagWord(), &mOutputEventFlag) != OK) {
        return Result::UNKNOWN_ERROR;
    }
    return true;

    mOutputCallback = cb;

    return Result::SUCCESS;
}

bool Demux::writeDataToFilterMQ(const std::vector<uint8_t>& data, uint32_t filterId) {
    std::lock_guard<std::mutex> lock(mWriteLock);
    if (mFilterMQs[filterId]->write(data.data(), data.size())) {
        return true;
Return<void> Demux::getOutputQueueDesc(getOutputQueueDesc_cb _hidl_cb) {
    ALOGV("%s", __FUNCTION__);

    if (!mOutputMQ) {
        _hidl_cb(Result::NOT_INITIALIZED, FilterMQ::Descriptor());
        return Void();
    }
    return false;

    _hidl_cb(Result::SUCCESS, *mOutputMQ->getDesc());
    return Void();
}

Return<Result> Demux::configureOutput(const DemuxOutputSettings& /* settings */) {
    ALOGV("%s", __FUNCTION__);

    return Result::SUCCESS;
}

Return<Result> Demux::attachOutputTsFilter(uint32_t /*filterId*/) {
    ALOGV("%s", __FUNCTION__);

    return Result::SUCCESS;
}

Return<Result> Demux::detachOutputTsFilter(uint32_t /* filterId */) {
    ALOGV("%s", __FUNCTION__);

    return Result::SUCCESS;
}

Return<Result> Demux::startOutput() {
    ALOGV("%s", __FUNCTION__);

    return Result::SUCCESS;
}

Return<Result> Demux::stopOutput() {
    ALOGV("%s", __FUNCTION__);

    return Result::SUCCESS;
}

Return<Result> Demux::flushOutput() {
    ALOGV("%s", __FUNCTION__);

    return Result::SUCCESS;
}

Return<Result> Demux::removeOutput() {
    ALOGV("%s", __FUNCTION__);

    return Result::SUCCESS;
}

Return<Result> Demux::addInput(uint32_t bufferSize, const sp<IDemuxCallback>& cb) {
    ALOGV("%s", __FUNCTION__);

    // Create a synchronized FMQ that supports blocking read/write
    std::unique_ptr<FilterMQ> tmpInputMQ =
            std::unique_ptr<FilterMQ>(new (std::nothrow) FilterMQ(bufferSize, true));
    if (!tmpInputMQ->isValid()) {
        ALOGW("Failed to create input FMQ");
        return Result::UNKNOWN_ERROR;
    }

    mInputMQ = std::move(tmpInputMQ);

    if (EventFlag::createEventFlag(mInputMQ->getEventFlagWord(), &mInputEventFlag) != OK) {
        return Result::UNKNOWN_ERROR;
    }

    mInputCallback = cb;

    return Result::SUCCESS;
}

Return<void> Demux::getInputQueueDesc(getInputQueueDesc_cb _hidl_cb) {
    ALOGV("%s", __FUNCTION__);

    if (!mInputMQ) {
        _hidl_cb(Result::NOT_INITIALIZED, FilterMQ::Descriptor());
        return Void();
    }

    _hidl_cb(Result::SUCCESS, *mInputMQ->getDesc());
    return Void();
}

Return<Result> Demux::configureInput(const DemuxInputSettings& /* settings */) {
    ALOGV("%s", __FUNCTION__);

    return Result::SUCCESS;
}

Return<Result> Demux::startInput() {
    ALOGV("%s", __FUNCTION__);

    pthread_create(&mInputThread, NULL, __threadLoopInput, this);
    pthread_setname_np(mInputThread, "demux_input_waiting_loop");

    // TODO start another thread to send filter status callback to the framework

    return Result::SUCCESS;
}

Return<Result> Demux::stopInput() {
    ALOGV("%s", __FUNCTION__);

    return Result::SUCCESS;
}

Return<Result> Demux::flushInput() {
    ALOGV("%s", __FUNCTION__);

    return Result::SUCCESS;
}

Result Demux::startSectionFilterHandler(DemuxFilterEvent event) {
Return<Result> Demux::removeInput() {
    ALOGV("%s", __FUNCTION__);

    mInputMQ = nullptr;

    return Result::SUCCESS;
}

Result Demux::startFilterLoop(uint32_t filterId) {
    struct ThreadArgs* threadArgs = (struct ThreadArgs*)malloc(sizeof(struct ThreadArgs));
    threadArgs->user = this;
    threadArgs->event = &event;
    threadArgs->filterId = filterId;

    pthread_create(&mThreadId, NULL, __threadLoop, (void*)threadArgs);
    pthread_setname_np(mThreadId, "demux_filter_waiting_loop");
    pthread_t mFilterThread;
    pthread_create(&mFilterThread, NULL, __threadLoopFilter, (void*)threadArgs);
    mFilterThreads[filterId] = mFilterThread;
    pthread_setname_np(mFilterThread, "demux_filter_waiting_loop");

    return Result::SUCCESS;
}

Result Demux::startPesFilterHandler(DemuxFilterEvent& event) {
Result Demux::startSectionFilterHandler(uint32_t filterId, vector<uint8_t> data) {
    if (!writeSectionsAndCreateEvent(filterId, data)) {
        ALOGD("[Demux] filter %d fails to write into FMQ. Ending thread", filterId);
        return Result::UNKNOWN_ERROR;
    }

    return Result::SUCCESS;
}

Result Demux::startPesFilterHandler(uint32_t filterId) {
    // TODO generate multiple events in one event callback
    DemuxFilterPesEvent pesEvent;
    pesEvent = {
@@ -288,19 +420,19 @@ Result Demux::startPesFilterHandler(DemuxFilterEvent& event) {
            .streamId = 0,
            .dataLength = 530,
    };
    event.events.resize(1);
    event.events[0].pes(pesEvent);
    mFilterEvents[filterId].events.resize(1);
    mFilterEvents[filterId].events[0].pes(pesEvent);
    /*pthread_create(&mThreadId, NULL, __threadLoop, this);
    pthread_setname_np(mThreadId, "demux_section_filter_waiting_loop");*/
    if (!writeDataToFilterMQ(fakeDataInputBuffer, event.filterId)) {
    if (!writeDataToFilterMQ(fakeDataInputBuffer, filterId)) {
        return Result::INVALID_STATE;
    }

    if (mDemuxCallbacks[event.filterId] == nullptr) {
    if (mDemuxCallbacks[filterId] == nullptr) {
        return Result::NOT_INITIALIZED;
    }

    mDemuxCallbacks[event.filterId]->onFilterEvent(event);
    mDemuxCallbacks[filterId]->onFilterEvent(mFilterEvents[filterId]);
    return Result::SUCCESS;
}

@@ -309,7 +441,7 @@ Result Demux::startTsFilterHandler() {
    return Result::SUCCESS;
}

Result Demux::startMediaFilterHandler(DemuxFilterEvent& event) {
Result Demux::startMediaFilterHandler(uint32_t filterId) {
    DemuxFilterMediaEvent mediaEvent;
    mediaEvent = {
            // temp dump meta data
@@ -317,13 +449,13 @@ Result Demux::startMediaFilterHandler(DemuxFilterEvent& event) {
            .dataLength = 530,
            .secureMemory = nullptr,
    };
    event.events.resize(1);
    event.events[0].media() = mediaEvent;
    mFilterEvents[filterId].events.resize(1);
    mFilterEvents[filterId].events[0].media() = mediaEvent;
    // TODO handle write FQM for media stream
    return Result::SUCCESS;
}

Result Demux::startRecordFilterHandler(DemuxFilterEvent& event) {
Result Demux::startRecordFilterHandler(uint32_t filterId) {
    DemuxFilterRecordEvent recordEvent;
    recordEvent = {
            // temp dump meta data
@@ -331,8 +463,8 @@ Result Demux::startRecordFilterHandler(DemuxFilterEvent& event) {
            .packetNum = 0,
    };
    recordEvent.indexMask.tsIndexMask() = 0x01;
    event.events.resize(1);
    event.events[0].ts() = recordEvent;
    mFilterEvents[filterId].events.resize(1);
    mFilterEvents[filterId].events[0].ts() = recordEvent;
    return Result::SUCCESS;
}

@@ -341,60 +473,194 @@ Result Demux::startPcrFilterHandler() {
    return Result::SUCCESS;
}

void* Demux::__threadLoop(void* threadArg) {
bool Demux::createFilterMQ(uint32_t bufferSize, uint32_t filterId) {
    ALOGV("%s", __FUNCTION__);

    // Create a synchronized FMQ that supports blocking read/write
    std::unique_ptr<FilterMQ> tmpFilterMQ =
            std::unique_ptr<FilterMQ>(new (std::nothrow) FilterMQ(bufferSize, true));
    if (!tmpFilterMQ->isValid()) {
        ALOGW("Failed to create FMQ of filter with id: %d", filterId);
        return false;
    }

    mFilterMQs[filterId] = std::move(tmpFilterMQ);

    EventFlag* filterEventFlag;
    if (EventFlag::createEventFlag(mFilterMQs[filterId]->getEventFlagWord(), &filterEventFlag) !=
        OK) {
        return false;
    }
    mFilterEventFlags[filterId] = filterEventFlag;

    return true;
}

bool Demux::writeSectionsAndCreateEvent(uint32_t filterId, vector<uint8_t> data) {
    // TODO check how many sections has been read
    std::lock_guard<std::mutex> lock(mFilterEventLock);
    int size = mFilterEvents[filterId].events.size();
    mFilterEvents[filterId].events.resize(size + 1);
    if (!writeDataToFilterMQ(data, filterId)) {
        return false;
    }
    DemuxFilterSectionEvent secEvent;
    secEvent = {
            // temp dump meta data
            .tableId = 0,
            .version = 1,
            .sectionNum = 1,
            .dataLength = 530,
    };
    mFilterEvents[filterId].events[size].section(secEvent);
    return true;
}

bool Demux::writeDataToFilterMQ(const std::vector<uint8_t>& data, uint32_t filterId) {
    std::lock_guard<std::mutex> lock(mWriteLock);
    if (mFilterMQs[filterId]->write(data.data(), data.size())) {
        return true;
    }
    return false;
}

bool Demux::filterAndOutputData() {
    ALOGD("[Demux] start to dispatch data to filters");
    // Read input data from the input FMQ
    int size = mInputMQ->availableToRead();
    vector<uint8_t> dataOutputBuffer;
    dataOutputBuffer.resize(size);
    mInputMQ->read(dataOutputBuffer.data(), size);

    Result result;
    // Filter the data and feed the output to each filter
    set<uint32_t>::iterator it;
    for (it = mUsedFilterIds.begin(); it != mUsedFilterIds.end(); it++) {
        switch (mFilterEvents[*it].filterType) {
            case DemuxFilterType::SECTION:
                result = startSectionFilterHandler(*it, dataOutputBuffer);
                break;
            case DemuxFilterType::PES:
                result = startPesFilterHandler(*it);
                break;
            case DemuxFilterType::TS:
                result = startTsFilterHandler();
                break;
            case DemuxFilterType::AUDIO:
            case DemuxFilterType::VIDEO:
                result = startMediaFilterHandler(*it);
                break;
            case DemuxFilterType::RECORD:
                result = startRecordFilterHandler(*it);
                break;
            case DemuxFilterType::PCR:
                result = startPcrFilterHandler();
                break;
            default:
                return false;
        }
    }

    return result == Result::SUCCESS;
}

void* Demux::__threadLoopFilter(void* threadArg) {
    Demux* const self = static_cast<Demux*>(((struct ThreadArgs*)threadArg)->user);
    self->filterThreadLoop(((struct ThreadArgs*)threadArg)->event);
    self->filterThreadLoop(((struct ThreadArgs*)threadArg)->filterId);
    return 0;
}

void* Demux::__threadLoopInput(void* user) {
    Demux* const self = static_cast<Demux*>(user);
    self->inputThreadLoop();
    return 0;
}

void Demux::filterThreadLoop(DemuxFilterEvent* event) {
    uint32_t filterId = event->filterId;
void Demux::filterThreadLoop(uint32_t filterId) {
    ALOGD("[Demux] filter %d threadLoop start.", filterId);
    mThreadRunning[filterId] = true;
    mFilterThreadRunning[filterId] = true;

    // For the first time of filter output, implementation needs to send the filter
    // Event Callback without waiting for the DATA_CONSUMED to init the process.
    while (mFilterThreadRunning[filterId]) {
        if (mFilterEvents[filterId].events.size() == 0) {
            ALOGD("[Demux] wait for filter data output.");
            usleep(1000 * 1000);
            continue;
        }
        // After successfully write, send a callback and wait for the read to be done
        mDemuxCallbacks[filterId]->onFilterEvent(mFilterEvents[filterId]);
        mFilterEvents[filterId].events.resize(0);
        break;
    }

    while (mThreadRunning[filterId]) {
    while (mFilterThreadRunning[filterId]) {
        uint32_t efState = 0;
        // We do not wait for the last round of writen data to be read to finish the thread
        // because the VTS can verify the reading itself.
        for (int i = 0; i < SECTION_WRITE_COUNT; i++) {
            DemuxFilterEvent filterEvent{
                    .filterId = filterId,
                    .filterType = event->filterType,
            };
            if (!writeSectionsAndCreateEvent(filterEvent, 2)) {
                ALOGD("[Demux] filter %d fails to write into FMQ. Ending thread", filterId);
            while (mFilterThreadRunning[filterId]) {
                status_t status = mFilterEventFlags[filterId]->wait(
                        static_cast<uint32_t>(DemuxQueueNotifyBits::DATA_CONSUMED), &efState,
                        WAIT_TIMEOUT, true /* retry on spurious wake */);
                if (status != OK) {
                    ALOGD("[Demux] wait for data consumed");
                    continue;
                }
                break;
            }
            mFilterWriteCount[filterId]++;

            if (mDemuxCallbacks[filterId] == nullptr) {
                ALOGD("[Demux] filter %d does not hava callback. Ending thread", filterId);
                break;
            }

            while (mFilterThreadRunning[filterId]) {
                std::lock_guard<std::mutex> lock(mFilterEventLock);
                if (mFilterEvents[filterId].events.size() == 0) {
                    continue;
                }
                // After successfully write, send a callback and wait for the read to be done
            mDemuxCallbacks[filterId]->onFilterEvent(filterEvent);
                mDemuxCallbacks[filterId]->onFilterEvent(mFilterEvents[filterId]);
                mFilterEvents[filterId].events.resize(0);
                break;
            }
            // We do not wait for the last read to be done
            // VTS can verify the read result itself.
            if (i == SECTION_WRITE_COUNT - 1) {
                ALOGD("[Demux] filter %d writing done. Ending thread", filterId);
                break;
            }
            while (mThreadRunning[filterId]) {
                status_t status = mFilterEventFlags[filterId]->wait(
                        static_cast<uint32_t>(DemuxQueueNotifyBits::DATA_CONSUMED), &efState,
                        WAIT_TIMEOUT, true /* retry on spurious wake */);
        }
        mFilterThreadRunning[filterId] = false;
    }

    ALOGD("[Demux] filter thread ended.");
}

void Demux::inputThreadLoop() {
    ALOGD("[Demux] input threadLoop start.");
    mInputThreadRunning = true;

    while (mInputThreadRunning) {
        uint32_t efState = 0;
        status_t status =
                mInputEventFlag->wait(static_cast<uint32_t>(DemuxQueueNotifyBits::DATA_READY),
                                      &efState, WAIT_TIMEOUT, true /* retry on spurious wake */);
        if (status != OK) {
                    ALOGD("[Demux] wait for data consumed");
            ALOGD("[Demux] wait for data ready on the input FMQ");
            continue;
        }
        // Our current implementation filter the data and write it into the filter FMQ immedaitely
        // after the DATA_READY from the VTS/framework
        if (!filterAndOutputData()) {
            ALOGD("[Demux] input data failed to be filtered. Ending thread");
            break;
        }
    }

        mFilterWriteCount[filterId] = 0;
        mThreadRunning[filterId] = false;
    }

    ALOGD("[Demux] filter thread ended.");
    mInputThreadRunning = false;
    ALOGD("[Demux] input thread ended.");
}

}  // namespace implementation
+97 −24

File changed.

Preview size limit exceeded, changes collapsed.