Loading tv/tuner/aidl/default/Demux.cpp +19 −14 Original line number Diff line number Diff line Loading @@ -53,6 +53,9 @@ void Demux::setTunerService(std::shared_ptr<Tuner> tuner) { Demux::~Demux() { ALOGV("%s", __FUNCTION__); if (mDemuxIptvReadThread.joinable()) { mDemuxIptvReadThread.join(); } close(); } Loading Loading @@ -114,16 +117,26 @@ Demux::~Demux() { } } void Demux::readIptvThreadLoop(dtv_plugin* interface, dtv_streamer* streamer, void* buf, size_t buf_size, int timeout_ms, int buffer_timeout) { void Demux::setIptvThreadRunning(bool isIptvThreadRunning) { std::unique_lock<std::mutex> lock(mIsIptvThreadRunningMutex); mIsIptvReadThreadRunning = isIptvThreadRunning; mIsIptvThreadRunningCv.notify_all(); } void Demux::readIptvThreadLoop(dtv_plugin* interface, dtv_streamer* streamer, size_t buf_size, int timeout_ms, int buffer_timeout) { Timer *timer, *fullBufferTimer; while (mDemuxIptvReadThreadRunning) { while (true) { std::unique_lock<std::mutex> lock(mIsIptvThreadRunningMutex); mIsIptvThreadRunningCv.wait(lock, [this] { return mIsIptvReadThreadRunning; }); if (mIsIptvDvrFMQFull && fullBufferTimer->get_elapsed_time_ms() > buffer_timeout) { ALOGE("DVR FMQ has not been flushed within timeout of %d ms", buffer_timeout); delete fullBufferTimer; break; } timer = new Timer(); void* buf = malloc(sizeof(char) * IPTV_BUFFER_SIZE); if (buf == nullptr) ALOGI("Buffer allocation failed"); ssize_t bytes_read = interface->read_stream(streamer, buf, buf_size, timeout_ms); if (bytes_read == 0) { double elapsed_time = timer->get_elapsed_time_ms(); Loading Loading @@ -157,8 +170,9 @@ void Demux::readIptvThreadLoop(dtv_plugin* interface, dtv_streamer* streamer, vo default: ALOGI("Invalid DVR Status"); } free(buf); } mDemuxIptvReadThreadRunning = false; } ::ndk::ScopedAStatus Demux::setFrontendDataSource(int32_t in_frontendId) { Loading Loading @@ -216,17 +230,8 @@ void Demux::readIptvThreadLoop(dtv_plugin* interface, dtv_streamer* streamer, vo // while thread is alive, keep reading data int timeout_ms = 20; int buffer_timeout = 10000; // 10s void* buf = malloc(sizeof(char) * IPTV_BUFFER_SIZE); if (buf == nullptr) ALOGI("malloc buf failed"); ALOGI("[ INFO ] Allocated buffer of size %d", IPTV_BUFFER_SIZE); ALOGI("Getting FMQ from DVR instance to write socket data"); mDemuxIptvReadThreadRunning = true; mDemuxIptvReadThread = std::thread(&Demux::readIptvThreadLoop, this, interface, streamer, buf, IPTV_BUFFER_SIZE, timeout_ms, buffer_timeout); if (mDemuxIptvReadThread.joinable()) { mDemuxIptvReadThread.join(); } free(buf); IPTV_BUFFER_SIZE, timeout_ms, buffer_timeout); } return ::ndk::ScopedAStatus::ok(); } Loading tv/tuner/aidl/default/Demux.h +13 −2 Original line number Diff line number Diff line Loading @@ -103,7 +103,7 @@ class Demux : public BnDemux { void setIsRecording(bool isRecording); bool isRecording(); void startFrontendInputLoop(); void readIptvThreadLoop(dtv_plugin* interface, dtv_streamer* streamer, void* buf, size_t size, void readIptvThreadLoop(dtv_plugin* interface, dtv_streamer* streamer, size_t size, int timeout_ms, int buffer_timeout); /** Loading @@ -124,6 +124,11 @@ class Demux : public BnDemux { void setInUse(bool inUse); void setTunerService(std::shared_ptr<Tuner> tuner); /** * Setter for IPTV Reading thread */ void setIptvThreadRunning(bool isIptvThreadRunning); private: // Tuner service std::shared_ptr<Tuner> mTuner; Loading Loading @@ -196,9 +201,15 @@ class Demux : public BnDemux { * If a specific filter's writing loop is still running */ std::atomic<bool> mFrontendInputThreadRunning; std::atomic<bool> mDemuxIptvReadThreadRunning; std::atomic<bool> mKeepFetchingDataFromFrontend; /** * Controls IPTV reading thread status */ bool mIsIptvReadThreadRunning; std::mutex mIsIptvThreadRunningMutex; std::condition_variable mIsIptvThreadRunningCv; /** * If the dvr recording is running. */ Loading tv/tuner/aidl/default/Filter.cpp +5 −0 Original line number Diff line number Diff line Loading @@ -328,6 +328,8 @@ Filter::~Filter() { std::vector<DemuxFilterEvent> events; mFilterCount += 1; mDemux->setIptvThreadRunning(true); // All the filter event callbacks in start are for testing purpose. switch (mType.mainType) { case DemuxFilterMainType::TS: Loading Loading @@ -365,6 +367,9 @@ Filter::~Filter() { ALOGV("%s", __FUNCTION__); mFilterCount -= 1; if (mFilterCount == 0) { mDemux->setIptvThreadRunning(false); } mFilterThreadRunning = false; if (mFilterThread.joinable()) { Loading Loading
tv/tuner/aidl/default/Demux.cpp +19 −14 Original line number Diff line number Diff line Loading @@ -53,6 +53,9 @@ void Demux::setTunerService(std::shared_ptr<Tuner> tuner) { Demux::~Demux() { ALOGV("%s", __FUNCTION__); if (mDemuxIptvReadThread.joinable()) { mDemuxIptvReadThread.join(); } close(); } Loading Loading @@ -114,16 +117,26 @@ Demux::~Demux() { } } void Demux::readIptvThreadLoop(dtv_plugin* interface, dtv_streamer* streamer, void* buf, size_t buf_size, int timeout_ms, int buffer_timeout) { void Demux::setIptvThreadRunning(bool isIptvThreadRunning) { std::unique_lock<std::mutex> lock(mIsIptvThreadRunningMutex); mIsIptvReadThreadRunning = isIptvThreadRunning; mIsIptvThreadRunningCv.notify_all(); } void Demux::readIptvThreadLoop(dtv_plugin* interface, dtv_streamer* streamer, size_t buf_size, int timeout_ms, int buffer_timeout) { Timer *timer, *fullBufferTimer; while (mDemuxIptvReadThreadRunning) { while (true) { std::unique_lock<std::mutex> lock(mIsIptvThreadRunningMutex); mIsIptvThreadRunningCv.wait(lock, [this] { return mIsIptvReadThreadRunning; }); if (mIsIptvDvrFMQFull && fullBufferTimer->get_elapsed_time_ms() > buffer_timeout) { ALOGE("DVR FMQ has not been flushed within timeout of %d ms", buffer_timeout); delete fullBufferTimer; break; } timer = new Timer(); void* buf = malloc(sizeof(char) * IPTV_BUFFER_SIZE); if (buf == nullptr) ALOGI("Buffer allocation failed"); ssize_t bytes_read = interface->read_stream(streamer, buf, buf_size, timeout_ms); if (bytes_read == 0) { double elapsed_time = timer->get_elapsed_time_ms(); Loading Loading @@ -157,8 +170,9 @@ void Demux::readIptvThreadLoop(dtv_plugin* interface, dtv_streamer* streamer, vo default: ALOGI("Invalid DVR Status"); } free(buf); } mDemuxIptvReadThreadRunning = false; } ::ndk::ScopedAStatus Demux::setFrontendDataSource(int32_t in_frontendId) { Loading Loading @@ -216,17 +230,8 @@ void Demux::readIptvThreadLoop(dtv_plugin* interface, dtv_streamer* streamer, vo // while thread is alive, keep reading data int timeout_ms = 20; int buffer_timeout = 10000; // 10s void* buf = malloc(sizeof(char) * IPTV_BUFFER_SIZE); if (buf == nullptr) ALOGI("malloc buf failed"); ALOGI("[ INFO ] Allocated buffer of size %d", IPTV_BUFFER_SIZE); ALOGI("Getting FMQ from DVR instance to write socket data"); mDemuxIptvReadThreadRunning = true; mDemuxIptvReadThread = std::thread(&Demux::readIptvThreadLoop, this, interface, streamer, buf, IPTV_BUFFER_SIZE, timeout_ms, buffer_timeout); if (mDemuxIptvReadThread.joinable()) { mDemuxIptvReadThread.join(); } free(buf); IPTV_BUFFER_SIZE, timeout_ms, buffer_timeout); } return ::ndk::ScopedAStatus::ok(); } Loading
tv/tuner/aidl/default/Demux.h +13 −2 Original line number Diff line number Diff line Loading @@ -103,7 +103,7 @@ class Demux : public BnDemux { void setIsRecording(bool isRecording); bool isRecording(); void startFrontendInputLoop(); void readIptvThreadLoop(dtv_plugin* interface, dtv_streamer* streamer, void* buf, size_t size, void readIptvThreadLoop(dtv_plugin* interface, dtv_streamer* streamer, size_t size, int timeout_ms, int buffer_timeout); /** Loading @@ -124,6 +124,11 @@ class Demux : public BnDemux { void setInUse(bool inUse); void setTunerService(std::shared_ptr<Tuner> tuner); /** * Setter for IPTV Reading thread */ void setIptvThreadRunning(bool isIptvThreadRunning); private: // Tuner service std::shared_ptr<Tuner> mTuner; Loading Loading @@ -196,9 +201,15 @@ class Demux : public BnDemux { * If a specific filter's writing loop is still running */ std::atomic<bool> mFrontendInputThreadRunning; std::atomic<bool> mDemuxIptvReadThreadRunning; std::atomic<bool> mKeepFetchingDataFromFrontend; /** * Controls IPTV reading thread status */ bool mIsIptvReadThreadRunning; std::mutex mIsIptvThreadRunningMutex; std::condition_variable mIsIptvThreadRunningCv; /** * If the dvr recording is running. */ Loading
tv/tuner/aidl/default/Filter.cpp +5 −0 Original line number Diff line number Diff line Loading @@ -328,6 +328,8 @@ Filter::~Filter() { std::vector<DemuxFilterEvent> events; mFilterCount += 1; mDemux->setIptvThreadRunning(true); // All the filter event callbacks in start are for testing purpose. switch (mType.mainType) { case DemuxFilterMainType::TS: Loading Loading @@ -365,6 +367,9 @@ Filter::~Filter() { ALOGV("%s", __FUNCTION__); mFilterCount -= 1; if (mFilterCount == 0) { mDemux->setIptvThreadRunning(false); } mFilterThreadRunning = false; if (mFilterThread.joinable()) { Loading