Loading tv/tuner/1.0/default/Demux.cpp +133 −27 Original line number Diff line number Diff line Loading @@ -67,8 +67,9 @@ const std::vector<uint8_t> fakeDataInputBuffer{ 0x73, 0x63, 0x65, 0x6e, 0x65, }; Demux::Demux(uint32_t demuxId) { Demux::Demux(uint32_t demuxId, sp<Tuner> tuner) { mDemuxId = demuxId; mTunerService = tuner; } Demux::~Demux() {} Loading @@ -76,9 +77,20 @@ Demux::~Demux() {} Return<Result> Demux::setFrontendDataSource(uint32_t frontendId) { ALOGV("%s", __FUNCTION__); mSourceFrontendId = frontendId; if (mTunerService == nullptr) { return Result::NOT_INITIALIZED; } return Result::SUCCESS; mFrontend = mTunerService->getFrontendById(frontendId); if (mFrontend == nullptr) { return Result::INVALID_STATE; } mFrontendSourceFile = mFrontend->getSourceFile(); mTunerService->setFrontendAsDemuxSource(frontendId, mDemuxId); return startBroadcastInputLoop(); } Return<void> Demux::addFilter(DemuxFilterType type, uint32_t bufferSize, Loading Loading @@ -194,6 +206,8 @@ Return<Result> Demux::stopFilter(uint32_t filterId) { mFilterThreadRunning[filterId] = false; std::lock_guard<std::mutex> lock(mFilterThreadLock); return Result::SUCCESS; } Loading Loading @@ -396,6 +410,8 @@ Return<Result> Demux::stopInput() { mInputThreadRunning = false; std::lock_guard<std::mutex> lock(mInputThreadLock); return Result::SUCCESS; } Loading Loading @@ -447,19 +463,28 @@ Result Demux::startPesFilterHandler(uint32_t filterId) { return Result::SUCCESS; } // TODO extract PES from TS if (!writeDataToFilterMQ(mFilterOutputs[filterId], filterId)) { for (int i = 0; i < mFilterOutputs[filterId].size(); i += 188) { uint8_t pusi = mFilterOutputs[filterId][i + 1] & 0x40; uint8_t adaptFieldControl = (mFilterOutputs[filterId][i + 3] & 0x30) >> 4; ALOGD("[Demux] pusi %d, adaptFieldControl %d", pusi, adaptFieldControl); if (pusi && (adaptFieldControl == 0x01)) { vector<uint8_t>::const_iterator first = mFilterOutputs[filterId].begin() + i + 4; vector<uint8_t>::const_iterator last = mFilterOutputs[filterId].begin() + i + 187; vector<uint8_t> filterOutData(first, last); if (!writeDataToFilterMQ(filterOutData, filterId)) { mFilterOutputs[filterId].clear(); return Result::INVALID_STATE; } pesEvent = { // temp dump meta data .streamId = 0, .dataLength = static_cast<uint16_t>(mFilterOutputs[filterId].size()), .streamId = filterOutData[3], .dataLength = static_cast<uint16_t>(filterOutData.size()), }; int size = mFilterEvents[filterId].events.size(); mFilterEvents[filterId].events.resize(size + 1); mFilterEvents[filterId].events[size].pes(pesEvent); } } mFilterOutputs[filterId].clear(); Loading @@ -481,6 +506,8 @@ Result Demux::startMediaFilterHandler(uint32_t filterId) { }; mFilterEvents[filterId].events.resize(1); mFilterEvents[filterId].events[0].media() = mediaEvent; mFilterOutputs[filterId].clear(); // TODO handle write FQM for media stream return Result::SUCCESS; } Loading @@ -495,6 +522,8 @@ Result Demux::startRecordFilterHandler(uint32_t filterId) { recordEvent.indexMask.tsIndexMask() = 0x01; mFilterEvents[filterId].events.resize(1); mFilterEvents[filterId].events[0].ts() = recordEvent; mFilterOutputs[filterId].clear(); return Result::SUCCESS; } Loading Loading @@ -554,10 +583,7 @@ bool Demux::writeDataToFilterMQ(const std::vector<uint8_t>& data, uint32_t filte return false; } bool Demux::filterAndOutputData() { Result result; set<uint32_t>::iterator it; bool Demux::readInputFMQ() { // Read input data from the input FMQ int size = mInputMQ->availableToRead(); int inputPacketSize = mInputSettings.packetSize; Loading @@ -566,16 +592,30 @@ bool Demux::filterAndOutputData() { // Dispatch the packet to the PID matching filter output buffer for (int i = 0; i < size / inputPacketSize; i++) { mInputMQ->read(dataOutputBuffer.data(), inputPacketSize); if (!mInputMQ->read(dataOutputBuffer.data(), inputPacketSize)) { return false; } startTsFilter(dataOutputBuffer); } return true; } void Demux::startTsFilter(vector<uint8_t> data) { set<uint32_t>::iterator it; for (it = mUsedFilterIds.begin(); it != mUsedFilterIds.end(); it++) { uint16_t pid = ((dataOutputBuffer[1] & 0x1f) << 8) | ((dataOutputBuffer[2] & 0xff)); uint16_t pid = ((data[1] & 0x1f) << 8) | ((data[2] & 0xff)); ALOGW("start ts filter pid: %d", pid); if (pid == mFilterPids[*it]) { mFilterOutputs[*it].insert(mFilterOutputs[*it].end(), dataOutputBuffer.begin(), dataOutputBuffer.end()); mFilterOutputs[*it].insert(mFilterOutputs[*it].end(), data.begin(), data.end()); } } } bool Demux::startFilterDispatcher() { Result result; set<uint32_t>::iterator it; // Handle the output data per filter type for (it = mUsedFilterIds.begin(); it != mUsedFilterIds.end(); it++) { switch (mFilterEvents[*it].filterType) { Loading Loading @@ -620,6 +660,7 @@ void* Demux::__threadLoopInput(void* user) { void Demux::filterThreadLoop(uint32_t filterId) { ALOGD("[Demux] filter %d threadLoop start.", filterId); std::lock_guard<std::mutex> lock(mFilterThreadLock); mFilterThreadRunning[filterId] = true; // For the first time of filter output, implementation needs to send the filter Loading Loading @@ -682,6 +723,7 @@ void Demux::filterThreadLoop(uint32_t filterId) { void Demux::inputThreadLoop() { ALOGD("[Demux] input threadLoop start."); std::lock_guard<std::mutex> lock(mInputThreadLock); mInputThreadRunning = true; while (mInputThreadRunning) { Loading @@ -695,7 +737,7 @@ void Demux::inputThreadLoop() { } // Our current implementation filter the data and write it into the filter FMQ immedaitely // after the DATA_READY from the VTS/framework if (!filterAndOutputData()) { if (!readInputFMQ() || !startFilterDispatcher()) { ALOGD("[Demux] input data failed to be filtered. Ending thread"); break; } Loading Loading @@ -735,6 +777,70 @@ DemuxInputStatus Demux::checkStatusChange(uint32_t availableToWrite, uint32_t av return mIntputStatus; } Result Demux::startBroadcastInputLoop() { pthread_create(&mBroadcastInputThread, NULL, __threadLoopBroadcast, this); pthread_setname_np(mBroadcastInputThread, "broadcast_input_thread"); return Result::SUCCESS; } void* Demux::__threadLoopBroadcast(void* user) { Demux* const self = static_cast<Demux*>(user); self->broadcastInputThreadLoop(); return 0; } void Demux::broadcastInputThreadLoop() { std::lock_guard<std::mutex> lock(mBroadcastInputThreadLock); mBroadcastInputThreadRunning = true; mKeepFetchingDataFromFrontend = true; // open the stream and get its length std::ifstream inputData(mFrontendSourceFile, std::ifstream::binary); // TODO take the packet size from the frontend setting int packetSize = 188; int writePacketAmount = 6; char* buffer = new char[packetSize]; ALOGW("[Demux] broadcast input thread loop start %s", mFrontendSourceFile.c_str()); if (!inputData.is_open()) { mBroadcastInputThreadRunning = false; ALOGW("[Demux] Error %s", strerror(errno)); } while (mBroadcastInputThreadRunning) { // move the stream pointer for packet size * 6 every read until the end while (mKeepFetchingDataFromFrontend) { for (int i = 0; i < writePacketAmount; i++) { inputData.read(buffer, packetSize); if (!inputData) { mBroadcastInputThreadRunning = false; break; } // filter and dispatch filter output vector<uint8_t> byteBuffer; byteBuffer.resize(sizeof(buffer)); for (int index = 0; index < byteBuffer.size(); index++) { byteBuffer[index] = static_cast<uint8_t>(buffer[index]); } startTsFilter(byteBuffer); inputData.seekg(packetSize, inputData.cur); } startFilterDispatcher(); sleep(1); } } ALOGW("[Demux] Broadcast Input thread end."); delete[] buffer; inputData.close(); } void Demux::stopBroadcastInput() { mKeepFetchingDataFromFrontend = false; mBroadcastInputThreadRunning = false; std::lock_guard<std::mutex> lock(mBroadcastInputThreadLock); } } // namespace implementation } // namespace V1_0 } // namespace tuner Loading tv/tuner/1.0/default/Demux.h +28 −3 Original line number Diff line number Diff line Loading @@ -20,6 +20,8 @@ #include <android/hardware/tv/tuner/1.0/IDemux.h> #include <fmq/MessageQueue.h> #include <set> #include "Frontend.h" #include "Tuner.h" using namespace std; Loading @@ -40,9 +42,12 @@ using ::android::hardware::tv::tuner::V1_0::Result; using FilterMQ = MessageQueue<uint8_t, kSynchronizedReadWrite>; class Tuner; class Frontend; class Demux : public IDemux { public: Demux(uint32_t demuxId); Demux(uint32_t demuxId, sp<Tuner> tuner); ~Demux(); Loading Loading @@ -103,7 +108,17 @@ class Demux : public IDemux { virtual Return<Result> removeOutput() override; // Functions interacts with Tuner Service void stopBroadcastInput(); private: // Tuner service sp<Tuner> mTunerService; // Frontend source sp<Frontend> mFrontend; string mFrontendSourceFile; // A struct that passes the arguments to a newly created filter thread struct ThreadArgs { Demux* user; Loading @@ -122,6 +137,7 @@ class Demux : public IDemux { Result startRecordFilterHandler(uint32_t filterId); Result startPcrFilterHandler(); Result startFilterLoop(uint32_t filterId); Result startBroadcastInputLoop(); /** * To create a FilterMQ with the the next available Filter ID. Loading @@ -143,14 +159,17 @@ class Demux : public IDemux { * A dispatcher to read and dispatch input data to all the started filters. * Each filter handler handles the data filtering/output writing/filterEvent updating. */ bool filterAndOutputData(); bool readInputFMQ(); void startTsFilter(vector<uint8_t> data); bool startFilterDispatcher(); static void* __threadLoopFilter(void* data); static void* __threadLoopInput(void* user); static void* __threadLoopBroadcast(void* user); void filterThreadLoop(uint32_t filterId); void inputThreadLoop(); void broadcastInputThreadLoop(); uint32_t mDemuxId; uint32_t mSourceFrontendId; /** * Record the last used filter id. Initial value is -1. * Filter Id starts with 0. Loading Loading @@ -195,6 +214,7 @@ class Demux : public IDemux { // Thread handlers pthread_t mInputThread; pthread_t mOutputThread; pthread_t mBroadcastInputThread; vector<pthread_t> mFilterThreads; // FMQ status local records Loading @@ -204,6 +224,8 @@ class Demux : public IDemux { */ vector<bool> mFilterThreadRunning; bool mInputThreadRunning; bool mBroadcastInputThreadRunning; bool mKeepFetchingDataFromFrontend; /** * Lock to protect writes to the FMQs */ Loading @@ -217,6 +239,9 @@ class Demux : public IDemux { * Lock to protect writes to the input status */ std::mutex mInputStatusLock; std::mutex mBroadcastInputThreadLock; std::mutex mFilterThreadLock; std::mutex mInputThreadLock; /** * How many times a filter should write * TODO make this dynamic/random/can take as a parameter Loading tv/tuner/1.0/default/Frontend.cpp +12 −7 Original line number Diff line number Diff line Loading @@ -27,14 +27,10 @@ namespace tuner { namespace V1_0 { namespace implementation { Frontend::Frontend() { // Init callback to nullptr mCallback = nullptr; } Frontend::Frontend(FrontendType type, FrontendId id) { Frontend::Frontend(FrontendType type, FrontendId id, sp<Tuner> tuner) { mType = type; mId = id; mTunerService = tuner; // Init callback to nullptr mCallback = nullptr; } Loading Loading @@ -67,13 +63,18 @@ Return<Result> Frontend::tune(const FrontendSettings& /* settings */) { return Result::INVALID_STATE; } mCallback->onEvent(FrontendEventType::NO_SIGNAL); // TODO dynamically allocate file to the source file mSourceStreamFile = FRONTEND_STREAM_FILE; mCallback->onEvent(FrontendEventType::LOCKED); return Result::SUCCESS; } Return<Result> Frontend::stopTune() { ALOGV("%s", __FUNCTION__); mTunerService->frontendStopTune(mId); return Result::SUCCESS; } Loading Loading @@ -119,6 +120,10 @@ FrontendId Frontend::getFrontendId() { return mId; } string Frontend::getSourceFile() { return mSourceStreamFile; } } // namespace implementation } // namespace V1_0 } // namespace tuner Loading tv/tuner/1.0/default/Frontend.h +13 −4 Original line number Diff line number Diff line Loading @@ -18,7 +18,9 @@ #define ANDROID_HARDWARE_TV_TUNER_V1_0_FRONTEND_H_ #include <android/hardware/tv/tuner/1.0/IFrontend.h> #include <android/hardware/tv/tuner/1.0/ITuner.h> #include <fstream> #include <iostream> #include "Tuner.h" using namespace std; Loading @@ -35,11 +37,11 @@ using ::android::hardware::tv::tuner::V1_0::IFrontend; using ::android::hardware::tv::tuner::V1_0::IFrontendCallback; using ::android::hardware::tv::tuner::V1_0::Result; class Tuner; class Frontend : public IFrontend { public: Frontend(); Frontend(FrontendType type, FrontendId id); Frontend(FrontendType type, FrontendId id, sp<Tuner> tuner); virtual Return<Result> close() override; Loading @@ -64,11 +66,18 @@ class Frontend : public IFrontend { FrontendId getFrontendId(); string getSourceFile(); private: virtual ~Frontend(); sp<IFrontendCallback> mCallback; sp<Tuner> mTunerService; FrontendType mType = FrontendType::UNDEFINED; FrontendId mId = 0; const string FRONTEND_STREAM_FILE = "/vendor/etc/test1.ts"; string mSourceStreamFile; std::ifstream mFrontendData; }; } // namespace implementation Loading tv/tuner/1.0/default/Tuner.cpp +29 −9 Original line number Diff line number Diff line Loading @@ -38,14 +38,14 @@ Tuner::Tuner() { // Array index matches their FrontendId in the default impl mFrontendSize = 8; mFrontends.resize(mFrontendSize); mFrontends[0] = new Frontend(); mFrontends[1] = new Frontend(FrontendType::ATSC, 1); mFrontends[2] = new Frontend(FrontendType::DVBC, 2); mFrontends[3] = new Frontend(FrontendType::DVBS, 3); mFrontends[4] = new Frontend(FrontendType::DVBT, 4); mFrontends[5] = new Frontend(FrontendType::ISDBT, 5); mFrontends[6] = new Frontend(FrontendType::ANALOG, 6); mFrontends[7] = new Frontend(FrontendType::ATSC, 7); mFrontends[0] = new Frontend(FrontendType::DVBT, 0, this); mFrontends[1] = new Frontend(FrontendType::ATSC, 1, this); mFrontends[2] = new Frontend(FrontendType::DVBC, 2, this); mFrontends[3] = new Frontend(FrontendType::DVBS, 3, this); mFrontends[4] = new Frontend(FrontendType::DVBT, 4, this); mFrontends[5] = new Frontend(FrontendType::ISDBT, 5, this); mFrontends[6] = new Frontend(FrontendType::ANALOG, 6, this); mFrontends[7] = new Frontend(FrontendType::ATSC, 7, this); } Tuner::~Tuner() {} Loading Loading @@ -81,7 +81,8 @@ Return<void> Tuner::openDemux(openDemux_cb _hidl_cb) { DemuxId demuxId = mLastUsedId + 1; mLastUsedId += 1; sp<IDemux> demux = new Demux(demuxId); sp<Demux> demux = new Demux(demuxId, this); mDemuxes[demuxId] = demux; _hidl_cb(Result::SUCCESS, demuxId, demux); return Void(); Loading Loading @@ -132,6 +133,25 @@ Return<void> Tuner::openLnbById(LnbId /* lnbId */, openLnbById_cb _hidl_cb) { return Void(); } sp<Frontend> Tuner::getFrontendById(uint32_t frontendId) { ALOGV("%s", __FUNCTION__); return mFrontends[frontendId]; } void Tuner::setFrontendAsDemuxSource(uint32_t frontendId, uint32_t demuxId) { mFrontendToDemux[frontendId] = demuxId; } void Tuner::frontendStopTune(uint32_t frontendId) { map<uint32_t, uint32_t>::iterator it = mFrontendToDemux.find(frontendId); uint32_t demuxId; if (it != mFrontendToDemux.end()) { demuxId = it->second; mDemuxes[demuxId]->stopBroadcastInput(); } } } // namespace implementation } // namespace V1_0 } // namespace tuner Loading Loading
tv/tuner/1.0/default/Demux.cpp +133 −27 Original line number Diff line number Diff line Loading @@ -67,8 +67,9 @@ const std::vector<uint8_t> fakeDataInputBuffer{ 0x73, 0x63, 0x65, 0x6e, 0x65, }; Demux::Demux(uint32_t demuxId) { Demux::Demux(uint32_t demuxId, sp<Tuner> tuner) { mDemuxId = demuxId; mTunerService = tuner; } Demux::~Demux() {} Loading @@ -76,9 +77,20 @@ Demux::~Demux() {} Return<Result> Demux::setFrontendDataSource(uint32_t frontendId) { ALOGV("%s", __FUNCTION__); mSourceFrontendId = frontendId; if (mTunerService == nullptr) { return Result::NOT_INITIALIZED; } return Result::SUCCESS; mFrontend = mTunerService->getFrontendById(frontendId); if (mFrontend == nullptr) { return Result::INVALID_STATE; } mFrontendSourceFile = mFrontend->getSourceFile(); mTunerService->setFrontendAsDemuxSource(frontendId, mDemuxId); return startBroadcastInputLoop(); } Return<void> Demux::addFilter(DemuxFilterType type, uint32_t bufferSize, Loading Loading @@ -194,6 +206,8 @@ Return<Result> Demux::stopFilter(uint32_t filterId) { mFilterThreadRunning[filterId] = false; std::lock_guard<std::mutex> lock(mFilterThreadLock); return Result::SUCCESS; } Loading Loading @@ -396,6 +410,8 @@ Return<Result> Demux::stopInput() { mInputThreadRunning = false; std::lock_guard<std::mutex> lock(mInputThreadLock); return Result::SUCCESS; } Loading Loading @@ -447,19 +463,28 @@ Result Demux::startPesFilterHandler(uint32_t filterId) { return Result::SUCCESS; } // TODO extract PES from TS if (!writeDataToFilterMQ(mFilterOutputs[filterId], filterId)) { for (int i = 0; i < mFilterOutputs[filterId].size(); i += 188) { uint8_t pusi = mFilterOutputs[filterId][i + 1] & 0x40; uint8_t adaptFieldControl = (mFilterOutputs[filterId][i + 3] & 0x30) >> 4; ALOGD("[Demux] pusi %d, adaptFieldControl %d", pusi, adaptFieldControl); if (pusi && (adaptFieldControl == 0x01)) { vector<uint8_t>::const_iterator first = mFilterOutputs[filterId].begin() + i + 4; vector<uint8_t>::const_iterator last = mFilterOutputs[filterId].begin() + i + 187; vector<uint8_t> filterOutData(first, last); if (!writeDataToFilterMQ(filterOutData, filterId)) { mFilterOutputs[filterId].clear(); return Result::INVALID_STATE; } pesEvent = { // temp dump meta data .streamId = 0, .dataLength = static_cast<uint16_t>(mFilterOutputs[filterId].size()), .streamId = filterOutData[3], .dataLength = static_cast<uint16_t>(filterOutData.size()), }; int size = mFilterEvents[filterId].events.size(); mFilterEvents[filterId].events.resize(size + 1); mFilterEvents[filterId].events[size].pes(pesEvent); } } mFilterOutputs[filterId].clear(); Loading @@ -481,6 +506,8 @@ Result Demux::startMediaFilterHandler(uint32_t filterId) { }; mFilterEvents[filterId].events.resize(1); mFilterEvents[filterId].events[0].media() = mediaEvent; mFilterOutputs[filterId].clear(); // TODO handle write FQM for media stream return Result::SUCCESS; } Loading @@ -495,6 +522,8 @@ Result Demux::startRecordFilterHandler(uint32_t filterId) { recordEvent.indexMask.tsIndexMask() = 0x01; mFilterEvents[filterId].events.resize(1); mFilterEvents[filterId].events[0].ts() = recordEvent; mFilterOutputs[filterId].clear(); return Result::SUCCESS; } Loading Loading @@ -554,10 +583,7 @@ bool Demux::writeDataToFilterMQ(const std::vector<uint8_t>& data, uint32_t filte return false; } bool Demux::filterAndOutputData() { Result result; set<uint32_t>::iterator it; bool Demux::readInputFMQ() { // Read input data from the input FMQ int size = mInputMQ->availableToRead(); int inputPacketSize = mInputSettings.packetSize; Loading @@ -566,16 +592,30 @@ bool Demux::filterAndOutputData() { // Dispatch the packet to the PID matching filter output buffer for (int i = 0; i < size / inputPacketSize; i++) { mInputMQ->read(dataOutputBuffer.data(), inputPacketSize); if (!mInputMQ->read(dataOutputBuffer.data(), inputPacketSize)) { return false; } startTsFilter(dataOutputBuffer); } return true; } void Demux::startTsFilter(vector<uint8_t> data) { set<uint32_t>::iterator it; for (it = mUsedFilterIds.begin(); it != mUsedFilterIds.end(); it++) { uint16_t pid = ((dataOutputBuffer[1] & 0x1f) << 8) | ((dataOutputBuffer[2] & 0xff)); uint16_t pid = ((data[1] & 0x1f) << 8) | ((data[2] & 0xff)); ALOGW("start ts filter pid: %d", pid); if (pid == mFilterPids[*it]) { mFilterOutputs[*it].insert(mFilterOutputs[*it].end(), dataOutputBuffer.begin(), dataOutputBuffer.end()); mFilterOutputs[*it].insert(mFilterOutputs[*it].end(), data.begin(), data.end()); } } } bool Demux::startFilterDispatcher() { Result result; set<uint32_t>::iterator it; // Handle the output data per filter type for (it = mUsedFilterIds.begin(); it != mUsedFilterIds.end(); it++) { switch (mFilterEvents[*it].filterType) { Loading Loading @@ -620,6 +660,7 @@ void* Demux::__threadLoopInput(void* user) { void Demux::filterThreadLoop(uint32_t filterId) { ALOGD("[Demux] filter %d threadLoop start.", filterId); std::lock_guard<std::mutex> lock(mFilterThreadLock); mFilterThreadRunning[filterId] = true; // For the first time of filter output, implementation needs to send the filter Loading Loading @@ -682,6 +723,7 @@ void Demux::filterThreadLoop(uint32_t filterId) { void Demux::inputThreadLoop() { ALOGD("[Demux] input threadLoop start."); std::lock_guard<std::mutex> lock(mInputThreadLock); mInputThreadRunning = true; while (mInputThreadRunning) { Loading @@ -695,7 +737,7 @@ void Demux::inputThreadLoop() { } // Our current implementation filter the data and write it into the filter FMQ immedaitely // after the DATA_READY from the VTS/framework if (!filterAndOutputData()) { if (!readInputFMQ() || !startFilterDispatcher()) { ALOGD("[Demux] input data failed to be filtered. Ending thread"); break; } Loading Loading @@ -735,6 +777,70 @@ DemuxInputStatus Demux::checkStatusChange(uint32_t availableToWrite, uint32_t av return mIntputStatus; } Result Demux::startBroadcastInputLoop() { pthread_create(&mBroadcastInputThread, NULL, __threadLoopBroadcast, this); pthread_setname_np(mBroadcastInputThread, "broadcast_input_thread"); return Result::SUCCESS; } void* Demux::__threadLoopBroadcast(void* user) { Demux* const self = static_cast<Demux*>(user); self->broadcastInputThreadLoop(); return 0; } void Demux::broadcastInputThreadLoop() { std::lock_guard<std::mutex> lock(mBroadcastInputThreadLock); mBroadcastInputThreadRunning = true; mKeepFetchingDataFromFrontend = true; // open the stream and get its length std::ifstream inputData(mFrontendSourceFile, std::ifstream::binary); // TODO take the packet size from the frontend setting int packetSize = 188; int writePacketAmount = 6; char* buffer = new char[packetSize]; ALOGW("[Demux] broadcast input thread loop start %s", mFrontendSourceFile.c_str()); if (!inputData.is_open()) { mBroadcastInputThreadRunning = false; ALOGW("[Demux] Error %s", strerror(errno)); } while (mBroadcastInputThreadRunning) { // move the stream pointer for packet size * 6 every read until the end while (mKeepFetchingDataFromFrontend) { for (int i = 0; i < writePacketAmount; i++) { inputData.read(buffer, packetSize); if (!inputData) { mBroadcastInputThreadRunning = false; break; } // filter and dispatch filter output vector<uint8_t> byteBuffer; byteBuffer.resize(sizeof(buffer)); for (int index = 0; index < byteBuffer.size(); index++) { byteBuffer[index] = static_cast<uint8_t>(buffer[index]); } startTsFilter(byteBuffer); inputData.seekg(packetSize, inputData.cur); } startFilterDispatcher(); sleep(1); } } ALOGW("[Demux] Broadcast Input thread end."); delete[] buffer; inputData.close(); } void Demux::stopBroadcastInput() { mKeepFetchingDataFromFrontend = false; mBroadcastInputThreadRunning = false; std::lock_guard<std::mutex> lock(mBroadcastInputThreadLock); } } // namespace implementation } // namespace V1_0 } // namespace tuner Loading
tv/tuner/1.0/default/Demux.h +28 −3 Original line number Diff line number Diff line Loading @@ -20,6 +20,8 @@ #include <android/hardware/tv/tuner/1.0/IDemux.h> #include <fmq/MessageQueue.h> #include <set> #include "Frontend.h" #include "Tuner.h" using namespace std; Loading @@ -40,9 +42,12 @@ using ::android::hardware::tv::tuner::V1_0::Result; using FilterMQ = MessageQueue<uint8_t, kSynchronizedReadWrite>; class Tuner; class Frontend; class Demux : public IDemux { public: Demux(uint32_t demuxId); Demux(uint32_t demuxId, sp<Tuner> tuner); ~Demux(); Loading Loading @@ -103,7 +108,17 @@ class Demux : public IDemux { virtual Return<Result> removeOutput() override; // Functions interacts with Tuner Service void stopBroadcastInput(); private: // Tuner service sp<Tuner> mTunerService; // Frontend source sp<Frontend> mFrontend; string mFrontendSourceFile; // A struct that passes the arguments to a newly created filter thread struct ThreadArgs { Demux* user; Loading @@ -122,6 +137,7 @@ class Demux : public IDemux { Result startRecordFilterHandler(uint32_t filterId); Result startPcrFilterHandler(); Result startFilterLoop(uint32_t filterId); Result startBroadcastInputLoop(); /** * To create a FilterMQ with the the next available Filter ID. Loading @@ -143,14 +159,17 @@ class Demux : public IDemux { * A dispatcher to read and dispatch input data to all the started filters. * Each filter handler handles the data filtering/output writing/filterEvent updating. */ bool filterAndOutputData(); bool readInputFMQ(); void startTsFilter(vector<uint8_t> data); bool startFilterDispatcher(); static void* __threadLoopFilter(void* data); static void* __threadLoopInput(void* user); static void* __threadLoopBroadcast(void* user); void filterThreadLoop(uint32_t filterId); void inputThreadLoop(); void broadcastInputThreadLoop(); uint32_t mDemuxId; uint32_t mSourceFrontendId; /** * Record the last used filter id. Initial value is -1. * Filter Id starts with 0. Loading Loading @@ -195,6 +214,7 @@ class Demux : public IDemux { // Thread handlers pthread_t mInputThread; pthread_t mOutputThread; pthread_t mBroadcastInputThread; vector<pthread_t> mFilterThreads; // FMQ status local records Loading @@ -204,6 +224,8 @@ class Demux : public IDemux { */ vector<bool> mFilterThreadRunning; bool mInputThreadRunning; bool mBroadcastInputThreadRunning; bool mKeepFetchingDataFromFrontend; /** * Lock to protect writes to the FMQs */ Loading @@ -217,6 +239,9 @@ class Demux : public IDemux { * Lock to protect writes to the input status */ std::mutex mInputStatusLock; std::mutex mBroadcastInputThreadLock; std::mutex mFilterThreadLock; std::mutex mInputThreadLock; /** * How many times a filter should write * TODO make this dynamic/random/can take as a parameter Loading
tv/tuner/1.0/default/Frontend.cpp +12 −7 Original line number Diff line number Diff line Loading @@ -27,14 +27,10 @@ namespace tuner { namespace V1_0 { namespace implementation { Frontend::Frontend() { // Init callback to nullptr mCallback = nullptr; } Frontend::Frontend(FrontendType type, FrontendId id) { Frontend::Frontend(FrontendType type, FrontendId id, sp<Tuner> tuner) { mType = type; mId = id; mTunerService = tuner; // Init callback to nullptr mCallback = nullptr; } Loading Loading @@ -67,13 +63,18 @@ Return<Result> Frontend::tune(const FrontendSettings& /* settings */) { return Result::INVALID_STATE; } mCallback->onEvent(FrontendEventType::NO_SIGNAL); // TODO dynamically allocate file to the source file mSourceStreamFile = FRONTEND_STREAM_FILE; mCallback->onEvent(FrontendEventType::LOCKED); return Result::SUCCESS; } Return<Result> Frontend::stopTune() { ALOGV("%s", __FUNCTION__); mTunerService->frontendStopTune(mId); return Result::SUCCESS; } Loading Loading @@ -119,6 +120,10 @@ FrontendId Frontend::getFrontendId() { return mId; } string Frontend::getSourceFile() { return mSourceStreamFile; } } // namespace implementation } // namespace V1_0 } // namespace tuner Loading
tv/tuner/1.0/default/Frontend.h +13 −4 Original line number Diff line number Diff line Loading @@ -18,7 +18,9 @@ #define ANDROID_HARDWARE_TV_TUNER_V1_0_FRONTEND_H_ #include <android/hardware/tv/tuner/1.0/IFrontend.h> #include <android/hardware/tv/tuner/1.0/ITuner.h> #include <fstream> #include <iostream> #include "Tuner.h" using namespace std; Loading @@ -35,11 +37,11 @@ using ::android::hardware::tv::tuner::V1_0::IFrontend; using ::android::hardware::tv::tuner::V1_0::IFrontendCallback; using ::android::hardware::tv::tuner::V1_0::Result; class Tuner; class Frontend : public IFrontend { public: Frontend(); Frontend(FrontendType type, FrontendId id); Frontend(FrontendType type, FrontendId id, sp<Tuner> tuner); virtual Return<Result> close() override; Loading @@ -64,11 +66,18 @@ class Frontend : public IFrontend { FrontendId getFrontendId(); string getSourceFile(); private: virtual ~Frontend(); sp<IFrontendCallback> mCallback; sp<Tuner> mTunerService; FrontendType mType = FrontendType::UNDEFINED; FrontendId mId = 0; const string FRONTEND_STREAM_FILE = "/vendor/etc/test1.ts"; string mSourceStreamFile; std::ifstream mFrontendData; }; } // namespace implementation Loading
tv/tuner/1.0/default/Tuner.cpp +29 −9 Original line number Diff line number Diff line Loading @@ -38,14 +38,14 @@ Tuner::Tuner() { // Array index matches their FrontendId in the default impl mFrontendSize = 8; mFrontends.resize(mFrontendSize); mFrontends[0] = new Frontend(); mFrontends[1] = new Frontend(FrontendType::ATSC, 1); mFrontends[2] = new Frontend(FrontendType::DVBC, 2); mFrontends[3] = new Frontend(FrontendType::DVBS, 3); mFrontends[4] = new Frontend(FrontendType::DVBT, 4); mFrontends[5] = new Frontend(FrontendType::ISDBT, 5); mFrontends[6] = new Frontend(FrontendType::ANALOG, 6); mFrontends[7] = new Frontend(FrontendType::ATSC, 7); mFrontends[0] = new Frontend(FrontendType::DVBT, 0, this); mFrontends[1] = new Frontend(FrontendType::ATSC, 1, this); mFrontends[2] = new Frontend(FrontendType::DVBC, 2, this); mFrontends[3] = new Frontend(FrontendType::DVBS, 3, this); mFrontends[4] = new Frontend(FrontendType::DVBT, 4, this); mFrontends[5] = new Frontend(FrontendType::ISDBT, 5, this); mFrontends[6] = new Frontend(FrontendType::ANALOG, 6, this); mFrontends[7] = new Frontend(FrontendType::ATSC, 7, this); } Tuner::~Tuner() {} Loading Loading @@ -81,7 +81,8 @@ Return<void> Tuner::openDemux(openDemux_cb _hidl_cb) { DemuxId demuxId = mLastUsedId + 1; mLastUsedId += 1; sp<IDemux> demux = new Demux(demuxId); sp<Demux> demux = new Demux(demuxId, this); mDemuxes[demuxId] = demux; _hidl_cb(Result::SUCCESS, demuxId, demux); return Void(); Loading Loading @@ -132,6 +133,25 @@ Return<void> Tuner::openLnbById(LnbId /* lnbId */, openLnbById_cb _hidl_cb) { return Void(); } sp<Frontend> Tuner::getFrontendById(uint32_t frontendId) { ALOGV("%s", __FUNCTION__); return mFrontends[frontendId]; } void Tuner::setFrontendAsDemuxSource(uint32_t frontendId, uint32_t demuxId) { mFrontendToDemux[frontendId] = demuxId; } void Tuner::frontendStopTune(uint32_t frontendId) { map<uint32_t, uint32_t>::iterator it = mFrontendToDemux.find(frontendId); uint32_t demuxId; if (it != mFrontendToDemux.end()) { demuxId = it->second; mDemuxes[demuxId]->stopBroadcastInput(); } } } // namespace implementation } // namespace V1_0 } // namespace tuner Loading