Loading tv/tuner/1.0/default/Demux.cpp +53 −12 Original line number Diff line number Diff line Loading @@ -106,7 +106,7 @@ Return<void> Demux::addFilter(DemuxFilterType type, uint32_t bufferSize, } else { filterId = ++mLastUsedFilterId; mDemuxCallbacks.resize(filterId + 1); mFilterCallbacks.resize(filterId + 1); mFilterMQs.resize(filterId + 1); mFilterEvents.resize(filterId + 1); mFilterEventFlags.resize(filterId + 1); Loading @@ -114,6 +114,7 @@ Return<void> Demux::addFilter(DemuxFilterType type, uint32_t bufferSize, mFilterThreads.resize(filterId + 1); mFilterPids.resize(filterId + 1); mFilterOutputs.resize(filterId + 1); mFilterStatus.resize(filterId + 1); } mUsedFilterIds.insert(filterId); Loading @@ -125,7 +126,7 @@ Return<void> Demux::addFilter(DemuxFilterType type, uint32_t bufferSize, } // Add callback mDemuxCallbacks[filterId] = cb; mFilterCallbacks[filterId] = cb; // Mapping from the filter ID to the filter event DemuxFilterEvent event{ Loading Loading @@ -211,9 +212,16 @@ Return<Result> Demux::stopFilter(uint32_t filterId) { return Result::SUCCESS; } Return<Result> Demux::flushFilter(uint32_t /* filterId */) { Return<Result> Demux::flushFilter(uint32_t filterId) { ALOGV("%s", __FUNCTION__); // temp implementation to flush the FMQ int size = mFilterMQs[filterId]->availableToRead(); char* buffer = new char[size]; mOutputMQ->read((unsigned char*)&buffer[0], size); delete[] buffer; mFilterStatus[filterId] = DemuxFilterStatus::DATA_READY; return Result::SUCCESS; } Loading Loading @@ -254,7 +262,7 @@ Return<Result> Demux::close() { mFilterThreads.clear(); mUnusedFilterIds.clear(); mUsedFilterIds.clear(); mDemuxCallbacks.clear(); mFilterCallbacks.clear(); mFilterMQs.clear(); mFilterEvents.clear(); mFilterEventFlags.clear(); Loading Loading @@ -475,6 +483,7 @@ Result Demux::startPesFilterHandler(uint32_t filterId) { mFilterOutputs[filterId].clear(); return Result::INVALID_STATE; } maySendFilterStatusCallback(filterId); pesEvent = { // temp dump meta data .streamId = filterOutData[3], Loading Loading @@ -672,8 +681,10 @@ void Demux::filterThreadLoop(uint32_t filterId) { continue; } // After successfully write, send a callback and wait for the read to be done mDemuxCallbacks[filterId]->onFilterEvent(mFilterEvents[filterId]); mFilterCallbacks[filterId]->onFilterEvent(mFilterEvents[filterId]); mFilterEvents[filterId].events.resize(0); mFilterStatus[filterId] = DemuxFilterStatus::DATA_READY; mFilterCallbacks[filterId]->onFilterStatus(filterId, mFilterStatus[filterId]); break; } Loading @@ -693,18 +704,20 @@ void Demux::filterThreadLoop(uint32_t filterId) { break; } if (mDemuxCallbacks[filterId] == nullptr) { if (mFilterCallbacks[filterId] == nullptr) { ALOGD("[Demux] filter %d does not hava callback. Ending thread", filterId); break; } maySendFilterStatusCallback(filterId); while (mFilterThreadRunning[filterId]) { std::lock_guard<std::mutex> lock(mFilterEventLock); if (mFilterEvents[filterId].events.size() == 0) { continue; } // After successfully write, send a callback and wait for the read to be done mDemuxCallbacks[filterId]->onFilterEvent(mFilterEvents[filterId]); mFilterCallbacks[filterId]->onFilterEvent(mFilterEvents[filterId]); mFilterEvents[filterId].events.resize(0); break; } Loading Loading @@ -755,7 +768,7 @@ void Demux::maySendInputStatusCallback() { int availableToWrite = mInputMQ->availableToWrite(); DemuxInputStatus newStatus = checkStatusChange(availableToWrite, availableToRead, mInputSettings.highThreshold, checkInputStatusChange(availableToWrite, availableToRead, mInputSettings.highThreshold, mInputSettings.lowThreshold); if (mIntputStatus != newStatus) { mInputCallback->onInputStatus(newStatus); Loading @@ -763,7 +776,22 @@ void Demux::maySendInputStatusCallback() { } } DemuxInputStatus Demux::checkStatusChange(uint32_t availableToWrite, uint32_t availableToRead, void Demux::maySendFilterStatusCallback(uint32_t filterId) { std::lock_guard<std::mutex> lock(mFilterStatusLock); int availableToRead = mFilterMQs[filterId]->availableToRead(); int availableToWrite = mInputMQ->availableToWrite(); int fmqSize = mFilterMQs[filterId]->getQuantumCount(); DemuxFilterStatus newStatus = checkFilterStatusChange(filterId, availableToWrite, availableToRead, ceil(fmqSize * 0.75), ceil(fmqSize * 0.25)); if (mFilterStatus[filterId] != newStatus) { mFilterCallbacks[filterId]->onFilterStatus(filterId, newStatus); mFilterStatus[filterId] = newStatus; } } DemuxInputStatus Demux::checkInputStatusChange(uint32_t availableToWrite, uint32_t availableToRead, uint32_t highThreshold, uint32_t lowThreshold) { if (availableToWrite == 0) { return DemuxInputStatus::SPACE_FULL; Loading @@ -777,6 +805,19 @@ DemuxInputStatus Demux::checkStatusChange(uint32_t availableToWrite, uint32_t av return mIntputStatus; } DemuxFilterStatus Demux::checkFilterStatusChange(uint32_t filterId, uint32_t availableToWrite, uint32_t availableToRead, uint32_t highThreshold, uint32_t lowThreshold) { if (availableToWrite == 0) { return DemuxFilterStatus::OVERFLOW; } else if (availableToRead > highThreshold) { return DemuxFilterStatus::HIGH_WATER; } else if (availableToRead < lowThreshold) { return DemuxFilterStatus::LOW_WATER; } return mFilterStatus[filterId]; } Result Demux::startBroadcastInputLoop() { pthread_create(&mBroadcastInputThread, NULL, __threadLoopBroadcast, this); pthread_setname_np(mBroadcastInputThread, "broadcast_input_thread"); Loading Loading @@ -818,7 +859,7 @@ void Demux::broadcastInputThreadLoop() { } // filter and dispatch filter output vector<uint8_t> byteBuffer; byteBuffer.resize(sizeof(buffer)); byteBuffer.resize(packetSize); for (int index = 0; index < byteBuffer.size(); index++) { byteBuffer[index] = static_cast<uint8_t>(buffer[index]); } Loading tv/tuner/1.0/default/Demux.h +10 −3 Original line number Diff line number Diff line Loading @@ -19,6 +19,7 @@ #include <android/hardware/tv/tuner/1.0/IDemux.h> #include <fmq/MessageQueue.h> #include <math.h> #include <set> #include "Frontend.h" #include "Tuner.h" Loading Loading @@ -153,8 +154,12 @@ class Demux : public IDemux { bool readDataFromMQ(); bool writeSectionsAndCreateEvent(uint32_t filterId, vector<uint8_t> data); void maySendInputStatusCallback(); DemuxInputStatus checkStatusChange(uint32_t availableToWrite, uint32_t availableToRead, void maySendFilterStatusCallback(uint32_t filterId); DemuxInputStatus checkInputStatusChange(uint32_t availableToWrite, uint32_t availableToRead, uint32_t highThreshold, uint32_t lowThreshold); DemuxFilterStatus checkFilterStatusChange(uint32_t filterId, uint32_t availableToWrite, uint32_t availableToRead, uint32_t highThreshold, uint32_t lowThreshold); /** * A dispatcher to read and dispatch input data to all the started filters. * Each filter handler handles the data filtering/output writing/filterEvent updating. Loading Loading @@ -203,7 +208,7 @@ class Demux : public IDemux { /** * Demux callbacks used on filter events or IO buffer status */ vector<sp<IDemuxCallback>> mDemuxCallbacks; vector<sp<IDemuxCallback>> mFilterCallbacks; sp<IDemuxCallback> mInputCallback; sp<IDemuxCallback> mOutputCallback; bool mInputConfigured = false; Loading @@ -219,6 +224,7 @@ class Demux : public IDemux { // FMQ status local records DemuxInputStatus mIntputStatus; vector<DemuxFilterStatus> mFilterStatus; /** * If a specific filter's writing loop is still running */ Loading @@ -239,6 +245,7 @@ class Demux : public IDemux { * Lock to protect writes to the input status */ std::mutex mInputStatusLock; std::mutex mFilterStatusLock; std::mutex mBroadcastInputThreadLock; std::mutex mFilterThreadLock; std::mutex mInputThreadLock; Loading Loading
tv/tuner/1.0/default/Demux.cpp +53 −12 Original line number Diff line number Diff line Loading @@ -106,7 +106,7 @@ Return<void> Demux::addFilter(DemuxFilterType type, uint32_t bufferSize, } else { filterId = ++mLastUsedFilterId; mDemuxCallbacks.resize(filterId + 1); mFilterCallbacks.resize(filterId + 1); mFilterMQs.resize(filterId + 1); mFilterEvents.resize(filterId + 1); mFilterEventFlags.resize(filterId + 1); Loading @@ -114,6 +114,7 @@ Return<void> Demux::addFilter(DemuxFilterType type, uint32_t bufferSize, mFilterThreads.resize(filterId + 1); mFilterPids.resize(filterId + 1); mFilterOutputs.resize(filterId + 1); mFilterStatus.resize(filterId + 1); } mUsedFilterIds.insert(filterId); Loading @@ -125,7 +126,7 @@ Return<void> Demux::addFilter(DemuxFilterType type, uint32_t bufferSize, } // Add callback mDemuxCallbacks[filterId] = cb; mFilterCallbacks[filterId] = cb; // Mapping from the filter ID to the filter event DemuxFilterEvent event{ Loading Loading @@ -211,9 +212,16 @@ Return<Result> Demux::stopFilter(uint32_t filterId) { return Result::SUCCESS; } Return<Result> Demux::flushFilter(uint32_t /* filterId */) { Return<Result> Demux::flushFilter(uint32_t filterId) { ALOGV("%s", __FUNCTION__); // temp implementation to flush the FMQ int size = mFilterMQs[filterId]->availableToRead(); char* buffer = new char[size]; mOutputMQ->read((unsigned char*)&buffer[0], size); delete[] buffer; mFilterStatus[filterId] = DemuxFilterStatus::DATA_READY; return Result::SUCCESS; } Loading Loading @@ -254,7 +262,7 @@ Return<Result> Demux::close() { mFilterThreads.clear(); mUnusedFilterIds.clear(); mUsedFilterIds.clear(); mDemuxCallbacks.clear(); mFilterCallbacks.clear(); mFilterMQs.clear(); mFilterEvents.clear(); mFilterEventFlags.clear(); Loading Loading @@ -475,6 +483,7 @@ Result Demux::startPesFilterHandler(uint32_t filterId) { mFilterOutputs[filterId].clear(); return Result::INVALID_STATE; } maySendFilterStatusCallback(filterId); pesEvent = { // temp dump meta data .streamId = filterOutData[3], Loading Loading @@ -672,8 +681,10 @@ void Demux::filterThreadLoop(uint32_t filterId) { continue; } // After successfully write, send a callback and wait for the read to be done mDemuxCallbacks[filterId]->onFilterEvent(mFilterEvents[filterId]); mFilterCallbacks[filterId]->onFilterEvent(mFilterEvents[filterId]); mFilterEvents[filterId].events.resize(0); mFilterStatus[filterId] = DemuxFilterStatus::DATA_READY; mFilterCallbacks[filterId]->onFilterStatus(filterId, mFilterStatus[filterId]); break; } Loading @@ -693,18 +704,20 @@ void Demux::filterThreadLoop(uint32_t filterId) { break; } if (mDemuxCallbacks[filterId] == nullptr) { if (mFilterCallbacks[filterId] == nullptr) { ALOGD("[Demux] filter %d does not hava callback. Ending thread", filterId); break; } maySendFilterStatusCallback(filterId); while (mFilterThreadRunning[filterId]) { std::lock_guard<std::mutex> lock(mFilterEventLock); if (mFilterEvents[filterId].events.size() == 0) { continue; } // After successfully write, send a callback and wait for the read to be done mDemuxCallbacks[filterId]->onFilterEvent(mFilterEvents[filterId]); mFilterCallbacks[filterId]->onFilterEvent(mFilterEvents[filterId]); mFilterEvents[filterId].events.resize(0); break; } Loading Loading @@ -755,7 +768,7 @@ void Demux::maySendInputStatusCallback() { int availableToWrite = mInputMQ->availableToWrite(); DemuxInputStatus newStatus = checkStatusChange(availableToWrite, availableToRead, mInputSettings.highThreshold, checkInputStatusChange(availableToWrite, availableToRead, mInputSettings.highThreshold, mInputSettings.lowThreshold); if (mIntputStatus != newStatus) { mInputCallback->onInputStatus(newStatus); Loading @@ -763,7 +776,22 @@ void Demux::maySendInputStatusCallback() { } } DemuxInputStatus Demux::checkStatusChange(uint32_t availableToWrite, uint32_t availableToRead, void Demux::maySendFilterStatusCallback(uint32_t filterId) { std::lock_guard<std::mutex> lock(mFilterStatusLock); int availableToRead = mFilterMQs[filterId]->availableToRead(); int availableToWrite = mInputMQ->availableToWrite(); int fmqSize = mFilterMQs[filterId]->getQuantumCount(); DemuxFilterStatus newStatus = checkFilterStatusChange(filterId, availableToWrite, availableToRead, ceil(fmqSize * 0.75), ceil(fmqSize * 0.25)); if (mFilterStatus[filterId] != newStatus) { mFilterCallbacks[filterId]->onFilterStatus(filterId, newStatus); mFilterStatus[filterId] = newStatus; } } DemuxInputStatus Demux::checkInputStatusChange(uint32_t availableToWrite, uint32_t availableToRead, uint32_t highThreshold, uint32_t lowThreshold) { if (availableToWrite == 0) { return DemuxInputStatus::SPACE_FULL; Loading @@ -777,6 +805,19 @@ DemuxInputStatus Demux::checkStatusChange(uint32_t availableToWrite, uint32_t av return mIntputStatus; } DemuxFilterStatus Demux::checkFilterStatusChange(uint32_t filterId, uint32_t availableToWrite, uint32_t availableToRead, uint32_t highThreshold, uint32_t lowThreshold) { if (availableToWrite == 0) { return DemuxFilterStatus::OVERFLOW; } else if (availableToRead > highThreshold) { return DemuxFilterStatus::HIGH_WATER; } else if (availableToRead < lowThreshold) { return DemuxFilterStatus::LOW_WATER; } return mFilterStatus[filterId]; } Result Demux::startBroadcastInputLoop() { pthread_create(&mBroadcastInputThread, NULL, __threadLoopBroadcast, this); pthread_setname_np(mBroadcastInputThread, "broadcast_input_thread"); Loading Loading @@ -818,7 +859,7 @@ void Demux::broadcastInputThreadLoop() { } // filter and dispatch filter output vector<uint8_t> byteBuffer; byteBuffer.resize(sizeof(buffer)); byteBuffer.resize(packetSize); for (int index = 0; index < byteBuffer.size(); index++) { byteBuffer[index] = static_cast<uint8_t>(buffer[index]); } Loading
tv/tuner/1.0/default/Demux.h +10 −3 Original line number Diff line number Diff line Loading @@ -19,6 +19,7 @@ #include <android/hardware/tv/tuner/1.0/IDemux.h> #include <fmq/MessageQueue.h> #include <math.h> #include <set> #include "Frontend.h" #include "Tuner.h" Loading Loading @@ -153,8 +154,12 @@ class Demux : public IDemux { bool readDataFromMQ(); bool writeSectionsAndCreateEvent(uint32_t filterId, vector<uint8_t> data); void maySendInputStatusCallback(); DemuxInputStatus checkStatusChange(uint32_t availableToWrite, uint32_t availableToRead, void maySendFilterStatusCallback(uint32_t filterId); DemuxInputStatus checkInputStatusChange(uint32_t availableToWrite, uint32_t availableToRead, uint32_t highThreshold, uint32_t lowThreshold); DemuxFilterStatus checkFilterStatusChange(uint32_t filterId, uint32_t availableToWrite, uint32_t availableToRead, uint32_t highThreshold, uint32_t lowThreshold); /** * A dispatcher to read and dispatch input data to all the started filters. * Each filter handler handles the data filtering/output writing/filterEvent updating. Loading Loading @@ -203,7 +208,7 @@ class Demux : public IDemux { /** * Demux callbacks used on filter events or IO buffer status */ vector<sp<IDemuxCallback>> mDemuxCallbacks; vector<sp<IDemuxCallback>> mFilterCallbacks; sp<IDemuxCallback> mInputCallback; sp<IDemuxCallback> mOutputCallback; bool mInputConfigured = false; Loading @@ -219,6 +224,7 @@ class Demux : public IDemux { // FMQ status local records DemuxInputStatus mIntputStatus; vector<DemuxFilterStatus> mFilterStatus; /** * If a specific filter's writing loop is still running */ Loading @@ -239,6 +245,7 @@ class Demux : public IDemux { * Lock to protect writes to the input status */ std::mutex mInputStatusLock; std::mutex mFilterStatusLock; std::mutex mBroadcastInputThreadLock; std::mutex mFilterThreadLock; std::mutex mInputThreadLock; Loading