Loading tv/tuner/1.0/default/Demux.cpp +93 −31 Original line number Diff line number Diff line Loading @@ -106,7 +106,7 @@ Return<void> Demux::addFilter(DemuxFilterType type, uint32_t bufferSize, } else { filterId = ++mLastUsedFilterId; mDemuxCallbacks.resize(filterId + 1); mFilterCallbacks.resize(filterId + 1); mFilterMQs.resize(filterId + 1); mFilterEvents.resize(filterId + 1); mFilterEventFlags.resize(filterId + 1); Loading @@ -114,6 +114,7 @@ Return<void> Demux::addFilter(DemuxFilterType type, uint32_t bufferSize, mFilterThreads.resize(filterId + 1); mFilterPids.resize(filterId + 1); mFilterOutputs.resize(filterId + 1); mFilterStatus.resize(filterId + 1); } mUsedFilterIds.insert(filterId); Loading @@ -125,7 +126,7 @@ Return<void> Demux::addFilter(DemuxFilterType type, uint32_t bufferSize, } // Add callback mDemuxCallbacks[filterId] = cb; mFilterCallbacks[filterId] = cb; // Mapping from the filter ID to the filter event DemuxFilterEvent event{ Loading Loading @@ -211,9 +212,16 @@ Return<Result> Demux::stopFilter(uint32_t filterId) { return Result::SUCCESS; } Return<Result> Demux::flushFilter(uint32_t /* filterId */) { Return<Result> Demux::flushFilter(uint32_t filterId) { ALOGV("%s", __FUNCTION__); // temp implementation to flush the FMQ int size = mFilterMQs[filterId]->availableToRead(); char* buffer = new char[size]; mOutputMQ->read((unsigned char*)&buffer[0], size); delete[] buffer; mFilterStatus[filterId] = DemuxFilterStatus::DATA_READY; return Result::SUCCESS; } Loading Loading @@ -254,7 +262,7 @@ Return<Result> Demux::close() { mFilterThreads.clear(); mUnusedFilterIds.clear(); mUsedFilterIds.clear(); mDemuxCallbacks.clear(); mFilterCallbacks.clear(); mFilterMQs.clear(); mFilterEvents.clear(); mFilterEventFlags.clear(); Loading Loading @@ -458,32 +466,54 @@ Result Demux::startSectionFilterHandler(uint32_t filterId) { Result Demux::startPesFilterHandler(uint32_t filterId) { std::lock_guard<std::mutex> lock(mFilterEventLock); DemuxFilterPesEvent pesEvent; if (mFilterOutputs[filterId].empty()) { return Result::SUCCESS; } for (int i = 0; i < mFilterOutputs[filterId].size(); i += 188) { uint8_t pusi = mFilterOutputs[filterId][i + 1] & 0x40; uint8_t adaptFieldControl = (mFilterOutputs[filterId][i + 3] & 0x30) >> 4; ALOGD("[Demux] pusi %d, adaptFieldControl %d", pusi, adaptFieldControl); if (pusi && (adaptFieldControl == 0x01)) { if (mPesSizeLeft == 0) { uint32_t prefix = (mFilterOutputs[filterId][i + 4] << 16) | (mFilterOutputs[filterId][i + 5] << 8) | mFilterOutputs[filterId][i + 6]; ALOGD("[Demux] prefix %d", prefix); if (prefix == 0x000001) { // TODO handle mulptiple Pes filters mPesSizeLeft = (mFilterOutputs[filterId][i + 7] << 8) | mFilterOutputs[filterId][i + 8]; ALOGD("[Demux] pes data length %d", mPesSizeLeft); } else { continue; } } int endPoint = min(184, mPesSizeLeft); // append data and check size vector<uint8_t>::const_iterator first = mFilterOutputs[filterId].begin() + i + 4; vector<uint8_t>::const_iterator last = mFilterOutputs[filterId].begin() + i + 187; vector<uint8_t> filterOutData(first, last); if (!writeDataToFilterMQ(filterOutData, filterId)) { vector<uint8_t>::const_iterator last = mFilterOutputs[filterId].begin() + i + 3 + endPoint; mPesOutput.insert(mPesOutput.end(), first, last); // size does not match then continue mPesSizeLeft -= endPoint; if (mPesSizeLeft > 0) { continue; } // size match then create event if (!writeDataToFilterMQ(mPesOutput, filterId)) { mFilterOutputs[filterId].clear(); return Result::INVALID_STATE; } maySendFilterStatusCallback(filterId); DemuxFilterPesEvent pesEvent; pesEvent = { // temp dump meta data .streamId = filterOutData[3], .dataLength = static_cast<uint16_t>(filterOutData.size()), .streamId = mPesOutput[3], .dataLength = static_cast<uint16_t>(mPesOutput.size()), }; ALOGD("[Demux] assembled pes data length %d", pesEvent.dataLength); int size = mFilterEvents[filterId].events.size(); mFilterEvents[filterId].events.resize(size + 1); mFilterEvents[filterId].events[size].pes(pesEvent); } mPesOutput.clear(); } mFilterOutputs[filterId].clear(); Loading Loading @@ -672,8 +702,10 @@ void Demux::filterThreadLoop(uint32_t filterId) { continue; } // After successfully write, send a callback and wait for the read to be done mDemuxCallbacks[filterId]->onFilterEvent(mFilterEvents[filterId]); mFilterCallbacks[filterId]->onFilterEvent(mFilterEvents[filterId]); mFilterEvents[filterId].events.resize(0); mFilterStatus[filterId] = DemuxFilterStatus::DATA_READY; mFilterCallbacks[filterId]->onFilterStatus(filterId, mFilterStatus[filterId]); break; } Loading @@ -693,18 +725,20 @@ void Demux::filterThreadLoop(uint32_t filterId) { break; } if (mDemuxCallbacks[filterId] == nullptr) { if (mFilterCallbacks[filterId] == nullptr) { ALOGD("[Demux] filter %d does not hava callback. Ending thread", filterId); break; } maySendFilterStatusCallback(filterId); while (mFilterThreadRunning[filterId]) { std::lock_guard<std::mutex> lock(mFilterEventLock); if (mFilterEvents[filterId].events.size() == 0) { continue; } // After successfully write, send a callback and wait for the read to be done mDemuxCallbacks[filterId]->onFilterEvent(mFilterEvents[filterId]); mFilterCallbacks[filterId]->onFilterEvent(mFilterEvents[filterId]); mFilterEvents[filterId].events.resize(0); break; } Loading Loading @@ -755,7 +789,7 @@ void Demux::maySendInputStatusCallback() { int availableToWrite = mInputMQ->availableToWrite(); DemuxInputStatus newStatus = checkStatusChange(availableToWrite, availableToRead, mInputSettings.highThreshold, checkInputStatusChange(availableToWrite, availableToRead, mInputSettings.highThreshold, mInputSettings.lowThreshold); if (mIntputStatus != newStatus) { mInputCallback->onInputStatus(newStatus); Loading @@ -763,7 +797,22 @@ void Demux::maySendInputStatusCallback() { } } DemuxInputStatus Demux::checkStatusChange(uint32_t availableToWrite, uint32_t availableToRead, void Demux::maySendFilterStatusCallback(uint32_t filterId) { std::lock_guard<std::mutex> lock(mFilterStatusLock); int availableToRead = mFilterMQs[filterId]->availableToRead(); int availableToWrite = mInputMQ->availableToWrite(); int fmqSize = mFilterMQs[filterId]->getQuantumCount(); DemuxFilterStatus newStatus = checkFilterStatusChange(filterId, availableToWrite, availableToRead, ceil(fmqSize * 0.75), ceil(fmqSize * 0.25)); if (mFilterStatus[filterId] != newStatus) { mFilterCallbacks[filterId]->onFilterStatus(filterId, newStatus); mFilterStatus[filterId] = newStatus; } } DemuxInputStatus Demux::checkInputStatusChange(uint32_t availableToWrite, uint32_t availableToRead, uint32_t highThreshold, uint32_t lowThreshold) { if (availableToWrite == 0) { return DemuxInputStatus::SPACE_FULL; Loading @@ -777,6 +826,19 @@ DemuxInputStatus Demux::checkStatusChange(uint32_t availableToWrite, uint32_t av return mIntputStatus; } DemuxFilterStatus Demux::checkFilterStatusChange(uint32_t filterId, uint32_t availableToWrite, uint32_t availableToRead, uint32_t highThreshold, uint32_t lowThreshold) { if (availableToWrite == 0) { return DemuxFilterStatus::OVERFLOW; } else if (availableToRead > highThreshold) { return DemuxFilterStatus::HIGH_WATER; } else if (availableToRead < lowThreshold) { return DemuxFilterStatus::LOW_WATER; } return mFilterStatus[filterId]; } Result Demux::startBroadcastInputLoop() { pthread_create(&mBroadcastInputThread, NULL, __threadLoopBroadcast, this); pthread_setname_np(mBroadcastInputThread, "broadcast_input_thread"); Loading Loading @@ -818,7 +880,7 @@ void Demux::broadcastInputThreadLoop() { } // filter and dispatch filter output vector<uint8_t> byteBuffer; byteBuffer.resize(sizeof(buffer)); byteBuffer.resize(packetSize); for (int index = 0; index < byteBuffer.size(); index++) { byteBuffer[index] = static_cast<uint8_t>(buffer[index]); } Loading tv/tuner/1.0/default/Demux.h +15 −3 Original line number Diff line number Diff line Loading @@ -19,6 +19,7 @@ #include <android/hardware/tv/tuner/1.0/IDemux.h> #include <fmq/MessageQueue.h> #include <math.h> #include <set> #include "Frontend.h" #include "Tuner.h" Loading Loading @@ -153,8 +154,12 @@ class Demux : public IDemux { bool readDataFromMQ(); bool writeSectionsAndCreateEvent(uint32_t filterId, vector<uint8_t> data); void maySendInputStatusCallback(); DemuxInputStatus checkStatusChange(uint32_t availableToWrite, uint32_t availableToRead, void maySendFilterStatusCallback(uint32_t filterId); DemuxInputStatus checkInputStatusChange(uint32_t availableToWrite, uint32_t availableToRead, uint32_t highThreshold, uint32_t lowThreshold); DemuxFilterStatus checkFilterStatusChange(uint32_t filterId, uint32_t availableToWrite, uint32_t availableToRead, uint32_t highThreshold, uint32_t lowThreshold); /** * A dispatcher to read and dispatch input data to all the started filters. * Each filter handler handles the data filtering/output writing/filterEvent updating. Loading Loading @@ -203,7 +208,7 @@ class Demux : public IDemux { /** * Demux callbacks used on filter events or IO buffer status */ vector<sp<IDemuxCallback>> mDemuxCallbacks; vector<sp<IDemuxCallback>> mFilterCallbacks; sp<IDemuxCallback> mInputCallback; sp<IDemuxCallback> mOutputCallback; bool mInputConfigured = false; Loading @@ -219,6 +224,7 @@ class Demux : public IDemux { // FMQ status local records DemuxInputStatus mIntputStatus; vector<DemuxFilterStatus> mFilterStatus; /** * If a specific filter's writing loop is still running */ Loading @@ -239,6 +245,7 @@ class Demux : public IDemux { * Lock to protect writes to the input status */ std::mutex mInputStatusLock; std::mutex mFilterStatusLock; std::mutex mBroadcastInputThreadLock; std::mutex mFilterThreadLock; std::mutex mInputThreadLock; Loading @@ -247,6 +254,11 @@ class Demux : public IDemux { * TODO make this dynamic/random/can take as a parameter */ const uint16_t SECTION_WRITE_COUNT = 10; // temp handle single PES filter // TODO handle mulptiple Pes filters int mPesSizeLeft = 0; vector<uint8_t> mPesOutput; }; } // namespace implementation Loading Loading
tv/tuner/1.0/default/Demux.cpp +93 −31 Original line number Diff line number Diff line Loading @@ -106,7 +106,7 @@ Return<void> Demux::addFilter(DemuxFilterType type, uint32_t bufferSize, } else { filterId = ++mLastUsedFilterId; mDemuxCallbacks.resize(filterId + 1); mFilterCallbacks.resize(filterId + 1); mFilterMQs.resize(filterId + 1); mFilterEvents.resize(filterId + 1); mFilterEventFlags.resize(filterId + 1); Loading @@ -114,6 +114,7 @@ Return<void> Demux::addFilter(DemuxFilterType type, uint32_t bufferSize, mFilterThreads.resize(filterId + 1); mFilterPids.resize(filterId + 1); mFilterOutputs.resize(filterId + 1); mFilterStatus.resize(filterId + 1); } mUsedFilterIds.insert(filterId); Loading @@ -125,7 +126,7 @@ Return<void> Demux::addFilter(DemuxFilterType type, uint32_t bufferSize, } // Add callback mDemuxCallbacks[filterId] = cb; mFilterCallbacks[filterId] = cb; // Mapping from the filter ID to the filter event DemuxFilterEvent event{ Loading Loading @@ -211,9 +212,16 @@ Return<Result> Demux::stopFilter(uint32_t filterId) { return Result::SUCCESS; } Return<Result> Demux::flushFilter(uint32_t /* filterId */) { Return<Result> Demux::flushFilter(uint32_t filterId) { ALOGV("%s", __FUNCTION__); // temp implementation to flush the FMQ int size = mFilterMQs[filterId]->availableToRead(); char* buffer = new char[size]; mOutputMQ->read((unsigned char*)&buffer[0], size); delete[] buffer; mFilterStatus[filterId] = DemuxFilterStatus::DATA_READY; return Result::SUCCESS; } Loading Loading @@ -254,7 +262,7 @@ Return<Result> Demux::close() { mFilterThreads.clear(); mUnusedFilterIds.clear(); mUsedFilterIds.clear(); mDemuxCallbacks.clear(); mFilterCallbacks.clear(); mFilterMQs.clear(); mFilterEvents.clear(); mFilterEventFlags.clear(); Loading Loading @@ -458,32 +466,54 @@ Result Demux::startSectionFilterHandler(uint32_t filterId) { Result Demux::startPesFilterHandler(uint32_t filterId) { std::lock_guard<std::mutex> lock(mFilterEventLock); DemuxFilterPesEvent pesEvent; if (mFilterOutputs[filterId].empty()) { return Result::SUCCESS; } for (int i = 0; i < mFilterOutputs[filterId].size(); i += 188) { uint8_t pusi = mFilterOutputs[filterId][i + 1] & 0x40; uint8_t adaptFieldControl = (mFilterOutputs[filterId][i + 3] & 0x30) >> 4; ALOGD("[Demux] pusi %d, adaptFieldControl %d", pusi, adaptFieldControl); if (pusi && (adaptFieldControl == 0x01)) { if (mPesSizeLeft == 0) { uint32_t prefix = (mFilterOutputs[filterId][i + 4] << 16) | (mFilterOutputs[filterId][i + 5] << 8) | mFilterOutputs[filterId][i + 6]; ALOGD("[Demux] prefix %d", prefix); if (prefix == 0x000001) { // TODO handle mulptiple Pes filters mPesSizeLeft = (mFilterOutputs[filterId][i + 7] << 8) | mFilterOutputs[filterId][i + 8]; ALOGD("[Demux] pes data length %d", mPesSizeLeft); } else { continue; } } int endPoint = min(184, mPesSizeLeft); // append data and check size vector<uint8_t>::const_iterator first = mFilterOutputs[filterId].begin() + i + 4; vector<uint8_t>::const_iterator last = mFilterOutputs[filterId].begin() + i + 187; vector<uint8_t> filterOutData(first, last); if (!writeDataToFilterMQ(filterOutData, filterId)) { vector<uint8_t>::const_iterator last = mFilterOutputs[filterId].begin() + i + 3 + endPoint; mPesOutput.insert(mPesOutput.end(), first, last); // size does not match then continue mPesSizeLeft -= endPoint; if (mPesSizeLeft > 0) { continue; } // size match then create event if (!writeDataToFilterMQ(mPesOutput, filterId)) { mFilterOutputs[filterId].clear(); return Result::INVALID_STATE; } maySendFilterStatusCallback(filterId); DemuxFilterPesEvent pesEvent; pesEvent = { // temp dump meta data .streamId = filterOutData[3], .dataLength = static_cast<uint16_t>(filterOutData.size()), .streamId = mPesOutput[3], .dataLength = static_cast<uint16_t>(mPesOutput.size()), }; ALOGD("[Demux] assembled pes data length %d", pesEvent.dataLength); int size = mFilterEvents[filterId].events.size(); mFilterEvents[filterId].events.resize(size + 1); mFilterEvents[filterId].events[size].pes(pesEvent); } mPesOutput.clear(); } mFilterOutputs[filterId].clear(); Loading Loading @@ -672,8 +702,10 @@ void Demux::filterThreadLoop(uint32_t filterId) { continue; } // After successfully write, send a callback and wait for the read to be done mDemuxCallbacks[filterId]->onFilterEvent(mFilterEvents[filterId]); mFilterCallbacks[filterId]->onFilterEvent(mFilterEvents[filterId]); mFilterEvents[filterId].events.resize(0); mFilterStatus[filterId] = DemuxFilterStatus::DATA_READY; mFilterCallbacks[filterId]->onFilterStatus(filterId, mFilterStatus[filterId]); break; } Loading @@ -693,18 +725,20 @@ void Demux::filterThreadLoop(uint32_t filterId) { break; } if (mDemuxCallbacks[filterId] == nullptr) { if (mFilterCallbacks[filterId] == nullptr) { ALOGD("[Demux] filter %d does not hava callback. Ending thread", filterId); break; } maySendFilterStatusCallback(filterId); while (mFilterThreadRunning[filterId]) { std::lock_guard<std::mutex> lock(mFilterEventLock); if (mFilterEvents[filterId].events.size() == 0) { continue; } // After successfully write, send a callback and wait for the read to be done mDemuxCallbacks[filterId]->onFilterEvent(mFilterEvents[filterId]); mFilterCallbacks[filterId]->onFilterEvent(mFilterEvents[filterId]); mFilterEvents[filterId].events.resize(0); break; } Loading Loading @@ -755,7 +789,7 @@ void Demux::maySendInputStatusCallback() { int availableToWrite = mInputMQ->availableToWrite(); DemuxInputStatus newStatus = checkStatusChange(availableToWrite, availableToRead, mInputSettings.highThreshold, checkInputStatusChange(availableToWrite, availableToRead, mInputSettings.highThreshold, mInputSettings.lowThreshold); if (mIntputStatus != newStatus) { mInputCallback->onInputStatus(newStatus); Loading @@ -763,7 +797,22 @@ void Demux::maySendInputStatusCallback() { } } DemuxInputStatus Demux::checkStatusChange(uint32_t availableToWrite, uint32_t availableToRead, void Demux::maySendFilterStatusCallback(uint32_t filterId) { std::lock_guard<std::mutex> lock(mFilterStatusLock); int availableToRead = mFilterMQs[filterId]->availableToRead(); int availableToWrite = mInputMQ->availableToWrite(); int fmqSize = mFilterMQs[filterId]->getQuantumCount(); DemuxFilterStatus newStatus = checkFilterStatusChange(filterId, availableToWrite, availableToRead, ceil(fmqSize * 0.75), ceil(fmqSize * 0.25)); if (mFilterStatus[filterId] != newStatus) { mFilterCallbacks[filterId]->onFilterStatus(filterId, newStatus); mFilterStatus[filterId] = newStatus; } } DemuxInputStatus Demux::checkInputStatusChange(uint32_t availableToWrite, uint32_t availableToRead, uint32_t highThreshold, uint32_t lowThreshold) { if (availableToWrite == 0) { return DemuxInputStatus::SPACE_FULL; Loading @@ -777,6 +826,19 @@ DemuxInputStatus Demux::checkStatusChange(uint32_t availableToWrite, uint32_t av return mIntputStatus; } DemuxFilterStatus Demux::checkFilterStatusChange(uint32_t filterId, uint32_t availableToWrite, uint32_t availableToRead, uint32_t highThreshold, uint32_t lowThreshold) { if (availableToWrite == 0) { return DemuxFilterStatus::OVERFLOW; } else if (availableToRead > highThreshold) { return DemuxFilterStatus::HIGH_WATER; } else if (availableToRead < lowThreshold) { return DemuxFilterStatus::LOW_WATER; } return mFilterStatus[filterId]; } Result Demux::startBroadcastInputLoop() { pthread_create(&mBroadcastInputThread, NULL, __threadLoopBroadcast, this); pthread_setname_np(mBroadcastInputThread, "broadcast_input_thread"); Loading Loading @@ -818,7 +880,7 @@ void Demux::broadcastInputThreadLoop() { } // filter and dispatch filter output vector<uint8_t> byteBuffer; byteBuffer.resize(sizeof(buffer)); byteBuffer.resize(packetSize); for (int index = 0; index < byteBuffer.size(); index++) { byteBuffer[index] = static_cast<uint8_t>(buffer[index]); } Loading
tv/tuner/1.0/default/Demux.h +15 −3 Original line number Diff line number Diff line Loading @@ -19,6 +19,7 @@ #include <android/hardware/tv/tuner/1.0/IDemux.h> #include <fmq/MessageQueue.h> #include <math.h> #include <set> #include "Frontend.h" #include "Tuner.h" Loading Loading @@ -153,8 +154,12 @@ class Demux : public IDemux { bool readDataFromMQ(); bool writeSectionsAndCreateEvent(uint32_t filterId, vector<uint8_t> data); void maySendInputStatusCallback(); DemuxInputStatus checkStatusChange(uint32_t availableToWrite, uint32_t availableToRead, void maySendFilterStatusCallback(uint32_t filterId); DemuxInputStatus checkInputStatusChange(uint32_t availableToWrite, uint32_t availableToRead, uint32_t highThreshold, uint32_t lowThreshold); DemuxFilterStatus checkFilterStatusChange(uint32_t filterId, uint32_t availableToWrite, uint32_t availableToRead, uint32_t highThreshold, uint32_t lowThreshold); /** * A dispatcher to read and dispatch input data to all the started filters. * Each filter handler handles the data filtering/output writing/filterEvent updating. Loading Loading @@ -203,7 +208,7 @@ class Demux : public IDemux { /** * Demux callbacks used on filter events or IO buffer status */ vector<sp<IDemuxCallback>> mDemuxCallbacks; vector<sp<IDemuxCallback>> mFilterCallbacks; sp<IDemuxCallback> mInputCallback; sp<IDemuxCallback> mOutputCallback; bool mInputConfigured = false; Loading @@ -219,6 +224,7 @@ class Demux : public IDemux { // FMQ status local records DemuxInputStatus mIntputStatus; vector<DemuxFilterStatus> mFilterStatus; /** * If a specific filter's writing loop is still running */ Loading @@ -239,6 +245,7 @@ class Demux : public IDemux { * Lock to protect writes to the input status */ std::mutex mInputStatusLock; std::mutex mFilterStatusLock; std::mutex mBroadcastInputThreadLock; std::mutex mFilterThreadLock; std::mutex mInputThreadLock; Loading @@ -247,6 +254,11 @@ class Demux : public IDemux { * TODO make this dynamic/random/can take as a parameter */ const uint16_t SECTION_WRITE_COUNT = 10; // temp handle single PES filter // TODO handle mulptiple Pes filters int mPesSizeLeft = 0; vector<uint8_t> mPesOutput; }; } // namespace implementation Loading