Loading tv/tuner/aidl/default/Filter.cpp +196 −80 Original line number Diff line number Diff line Loading @@ -34,16 +34,132 @@ namespace tuner { #define WAIT_TIMEOUT 3000000000 Filter::Filter() {} FilterCallbackScheduler::FilterCallbackScheduler(const std::shared_ptr<IFilterCallback>& cb) : mCallback(cb), mDataLength(0), mTimeDelayInMs(0), mDataSizeDelayInBytes(0) { start(); } Filter::Filter(DemuxFilterType type, int64_t filterId, uint32_t bufferSize, const std::shared_ptr<IFilterCallback>& cb, std::shared_ptr<Demux> demux) { mType = type; mFilterId = filterId; mBufferSize = bufferSize; mDemux = demux; mCallback = cb; FilterCallbackScheduler::~FilterCallbackScheduler() { stop(); } void FilterCallbackScheduler::onFilterEvent(DemuxFilterEvent&& event) { std::lock_guard<std::mutex> lock(mLock); mCallbackBuffer.push_back(std::move(event)); mDataLength += getDemuxFilterEventDataLength(event); if (mDataLength >= mDataSizeDelayInBytes) { // size limit has been reached, send out events mCv.notify_all(); } } void FilterCallbackScheduler::onFilterStatus(const DemuxFilterStatus& status) { if (mCallback) { mCallback->onFilterStatus(status); } } void FilterCallbackScheduler::setTimeDelayHint(int timeDelay) { // updating the setTimeDelay does not go into effect until the condition // variable times out or is notified. // One possibility is to notify the condition variable right away when the // time delay changes, but I don't see the benefit over waiting for the next // timeout / push, since -- in any case -- this will not change very often. mTimeDelayInMs = timeDelay; } void FilterCallbackScheduler::setDataSizeDelayHint(int dataSizeDelay) { // similar to updating the time delay hint, when mDataSizeDelayInBytes // changes, this will not go into immediate effect, but will wait until the // next filterEvent. // We could technically check the current data length and notify the // condition variable if we wanted to, but again, this may be overkill. mDataSizeDelayInBytes = dataSizeDelay; } bool FilterCallbackScheduler::hasCallbackRegistered() const { return mCallback != nullptr; } void FilterCallbackScheduler::start() { mIsRunning = true; mCallbackThread = std::thread(&FilterCallbackScheduler::threadLoop, this); } void FilterCallbackScheduler::stop() { mIsRunning = false; if (mCallbackThread.joinable()) { mCallbackThread.join(); } } void FilterCallbackScheduler::threadLoop() { while (mIsRunning) { threadLoopOnce(); } } void FilterCallbackScheduler::threadLoopOnce() { std::unique_lock<std::mutex> lock(mLock); // mTimeDelayInMs is an atomic value, so it should be copied into a local // variable before use (to make sure both the if statement and wait_for use // the same value). int timeDelayInMs = mTimeDelayInMs; if (timeDelayInMs > 0) { mCv.wait_for(lock, std::chrono::milliseconds(timeDelayInMs)); } else { // no reason to timeout, just wait until main thread determines it's // okay to send data. mCv.wait(lock); } // condition_variable wait locks mutex on timeout / notify if (!mCallbackBuffer.empty()) { if (mCallback) { mCallback->onFilterEvent(mCallbackBuffer); } mCallbackBuffer.clear(); mDataLength = 0; } lock.unlock(); } int FilterCallbackScheduler::getDemuxFilterEventDataLength(const DemuxFilterEvent& event) { // there is a risk that dataLength could be a negative value, but it // *should* be safe to assume that it is always positive. switch (event.getTag()) { case DemuxFilterEvent::Tag::section: return event.get<DemuxFilterEvent::Tag::section>().dataLength; case DemuxFilterEvent::Tag::media: return event.get<DemuxFilterEvent::Tag::media>().dataLength; case DemuxFilterEvent::Tag::pes: return event.get<DemuxFilterEvent::Tag::pes>().dataLength; case DemuxFilterEvent::Tag::download: return event.get<DemuxFilterEvent::Tag::download>().dataLength; case DemuxFilterEvent::Tag::ipPayload: return event.get<DemuxFilterEvent::Tag::ipPayload>().dataLength; case DemuxFilterEvent::Tag::tsRecord: case DemuxFilterEvent::Tag::mmtpRecord: case DemuxFilterEvent::Tag::temi: case DemuxFilterEvent::Tag::monitorEvent: case DemuxFilterEvent::Tag::startId: // these events do not include a payload and should therefore return // 0. // do not add a default option, so this will not compile when new types // are added. return 0; } } Filter::Filter(DemuxFilterType type, int64_t filterId, uint32_t bufferSize, const std::shared_ptr<IFilterCallback>& cb, std::shared_ptr<Demux> demux) : mDemux(demux), mCallbackScheduler(cb), mFilterId(filterId), mBufferSize(bufferSize), mType(type) { switch (mType.mainType) { case DemuxFilterMainType::TS: if (mType.subType.get<DemuxFilterSubType::Tag::tsFilterType>() == Loading Loading @@ -112,9 +228,29 @@ Filter::~Filter() { } ::ndk::ScopedAStatus Filter::setDelayHint(const FilterDelayHint& in_hint) { if (mIsMediaFilter) { // delay hint is not supported for media filters return ::ndk::ScopedAStatus::fromServiceSpecificError( static_cast<int32_t>(Result::UNAVAILABLE)); } ALOGV("%s", __FUNCTION__); (void)in_hint; // TODO: implement if (in_hint.hintValue < 0) { return ::ndk::ScopedAStatus::fromServiceSpecificError( static_cast<int32_t>(Result::INVALID_ARGUMENT)); } switch (in_hint.hintType) { case FilterDelayHintType::TIME_DELAY_IN_MS: mCallbackScheduler.setTimeDelayHint(in_hint.hintValue); break; case FilterDelayHintType::DATA_SIZE_DELAY_IN_BYTES: mCallbackScheduler.setDataSizeDelayHint(in_hint.hintValue); break; default: return ::ndk::ScopedAStatus::fromServiceSpecificError( static_cast<int32_t>(Result::INVALID_ARGUMENT)); } return ::ndk::ScopedAStatus::ok(); } Loading Loading @@ -155,40 +291,36 @@ Filter::~Filter() { ::ndk::ScopedAStatus Filter::start() { ALOGV("%s", __FUNCTION__); mFilterThreadRunning = true; vector<DemuxFilterEvent> events; std::vector<DemuxFilterEvent> events; // All the filter event callbacks in start are for testing purpose. switch (mType.mainType) { case DemuxFilterMainType::TS: createMediaEvent(events); mCallback->onFilterEvent(events); createTsRecordEvent(events); mCallback->onFilterEvent(events); createTemiEvent(events); mCallback->onFilterEvent(events); break; case DemuxFilterMainType::MMTP: createDownloadEvent(events); mCallback->onFilterEvent(events); createMmtpRecordEvent(events); mCallback->onFilterEvent(events); break; case DemuxFilterMainType::IP: createSectionEvent(events); mCallback->onFilterEvent(events); createIpPayloadEvent(events); mCallback->onFilterEvent(events); break; case DemuxFilterMainType::TLV: createMonitorEvent(events); mCallback->onFilterEvent(events); break; case DemuxFilterMainType::ALP: createMonitorEvent(events); mCallback->onFilterEvent(events); break; default: break; } for (auto&& event : events) { mCallbackScheduler.onFilterEvent(std::move(event)); } return startFilterLoop(); } Loading Loading @@ -327,15 +459,14 @@ Filter::~Filter() { if (newScramblingStatus ^ mScramblingStatusMonitored) { mScramblingStatusMonitored = newScramblingStatus; if (mScramblingStatusMonitored) { if (mCallback != nullptr) { if (mCallbackScheduler.hasCallbackRegistered()) { // Assuming current status is always NOT_SCRAMBLED vector<DemuxFilterEvent> events; DemuxFilterMonitorEvent monitorEvent; events.resize(1); monitorEvent.set<DemuxFilterMonitorEvent::Tag::scramblingStatus>( auto monitorEvent = DemuxFilterMonitorEvent::make< DemuxFilterMonitorEvent::Tag::scramblingStatus>( ScramblingStatus::NOT_SCRAMBLED); events[0].set<DemuxFilterEvent::monitorEvent>(monitorEvent); mCallback->onFilterEvent(events); auto event = DemuxFilterEvent::make<DemuxFilterEvent::Tag::monitorEvent>(monitorEvent); mCallbackScheduler.onFilterEvent(std::move(event)); } else { return ::ndk::ScopedAStatus::fromServiceSpecificError( static_cast<int32_t>(Result::INVALID_STATE)); Loading @@ -347,14 +478,13 @@ Filter::~Filter() { if (newIpCid ^ mIpCidMonitored) { mIpCidMonitored = newIpCid; if (mIpCidMonitored) { if (mCallback != nullptr) { if (mCallbackScheduler.hasCallbackRegistered()) { // Return random cid vector<DemuxFilterEvent> events; DemuxFilterMonitorEvent monitorEvent; events.resize(1); monitorEvent.set<DemuxFilterMonitorEvent::Tag::cid>(1); events[0].set<DemuxFilterEvent::monitorEvent>(monitorEvent); mCallback->onFilterEvent(events); auto monitorEvent = DemuxFilterMonitorEvent::make<DemuxFilterMonitorEvent::Tag::cid>(1); auto event = DemuxFilterEvent::make<DemuxFilterEvent::Tag::monitorEvent>(monitorEvent); mCallbackScheduler.onFilterEvent(std::move(event)); } else { return ::ndk::ScopedAStatus::fromServiceSpecificError( static_cast<int32_t>(Result::INVALID_STATE)); Loading Loading @@ -410,26 +540,26 @@ void Filter::filterThreadLoop() { } // After successfully write, send a callback and wait for the read to be done if (mCallback != nullptr) { if (mCallbackScheduler.hasCallbackRegistered()) { if (mConfigured) { vector<DemuxFilterEvent> startEvent; startEvent.resize(1); startEvent[0].set<DemuxFilterEvent::Tag::startId>(mStartId++); mCallback->onFilterEvent(startEvent); auto startEvent = DemuxFilterEvent::make<DemuxFilterEvent::Tag::startId>(mStartId++); mCallbackScheduler.onFilterEvent(std::move(startEvent)); mConfigured = false; } mCallback->onFilterEvent(mFilterEvents); for (auto&& event : mFilterEvents) { mCallbackScheduler.onFilterEvent(std::move(event)); } } else { ALOGD("[Filter] filter callback is not configured yet."); mFilterThreadRunning = false; return; } mFilterEvents.resize(0); mFilterEvents.clear(); mFilterStatus = DemuxFilterStatus::DATA_READY; if (mCallback != nullptr) { mCallback->onFilterStatus(mFilterStatus); } mCallbackScheduler.onFilterStatus(mFilterStatus); break; } Loading Loading @@ -460,10 +590,10 @@ void Filter::filterThreadLoop() { continue; } // After successfully write, send a callback and wait for the read to be done if (mCallback != nullptr) { mCallback->onFilterEvent(mFilterEvents); for (auto&& event : mFilterEvents) { mCallbackScheduler.onFilterEvent(std::move(event)); } mFilterEvents.resize(0); mFilterEvents.clear(); break; } // We do not wait for the last read to be done Loading Loading @@ -499,9 +629,7 @@ void Filter::maySendFilterStatusCallback() { DemuxFilterStatus newStatus = checkFilterStatusChange( availableToWrite, availableToRead, ceil(fmqSize * 0.75), ceil(fmqSize * 0.25)); if (mFilterStatus != newStatus) { if (mCallback != nullptr) { mCallback->onFilterStatus(newStatus); } mCallbackScheduler.onFilterStatus(newStatus); mFilterStatus = newStatus; } } Loading Loading @@ -657,9 +785,7 @@ void Filter::updateRecordOutput(vector<int8_t>& data) { ALOGD("[Filter] assembled pes data length %d", pesEvent.dataLength); } int size = mFilterEvents.size(); mFilterEvents.resize(size + 1); mFilterEvents[size].set<DemuxFilterEvent::Tag::pes>(pesEvent); mFilterEvents.push_back(DemuxFilterEvent::make<DemuxFilterEvent::Tag::pes>(pesEvent)); mPesOutput.clear(); } Loading Loading @@ -763,11 +889,7 @@ void Filter::updateRecordOutput(vector<int8_t>& data) { .firstMbInSlice = 0, // random address }; int size; size = mFilterEvents.size(); mFilterEvents.resize(size + 1); mFilterEvents[size].set<DemuxFilterEvent::Tag::tsRecord>(recordEvent); mFilterEvents.push_back(DemuxFilterEvent::make<DemuxFilterEvent::Tag::tsRecord>(recordEvent)); mRecordFilterOutput.clear(); return ::ndk::ScopedAStatus::ok(); } Loading @@ -789,8 +911,6 @@ bool Filter::writeSectionsAndCreateEvent(vector<int8_t>& data) { if (!writeDataToFilterMQ(data)) { return false; } int size = mFilterEvents.size(); mFilterEvents.resize(size + 1); DemuxFilterSectionEvent secEvent; secEvent = { // temp dump meta data Loading @@ -799,7 +919,7 @@ bool Filter::writeSectionsAndCreateEvent(vector<int8_t>& data) { .sectionNum = 1, .dataLength = static_cast<int32_t>(data.size()), }; mFilterEvents[size].set<DemuxFilterEvent::Tag::section>(secEvent); mFilterEvents.push_back(DemuxFilterEvent::make<DemuxFilterEvent::Tag::section>(secEvent)); return true; } Loading Loading @@ -888,19 +1008,16 @@ native_handle_t* Filter::createNativeHandle(int fd) { mDataId2Avfd[dataId] = dup(av_fd); // Create mediaEvent and send callback int size = mFilterEvents.size(); mFilterEvents.resize(size + 1); mFilterEvents[size] = DemuxFilterEvent::make<DemuxFilterEvent::Tag::media>(); mFilterEvents[size].get<DemuxFilterEvent::Tag::media>().avMemory = ::android::dupToAidl(nativeHandle); mFilterEvents[size].get<DemuxFilterEvent::Tag::media>().dataLength = static_cast<int64_t>(output.size()); mFilterEvents[size].get<DemuxFilterEvent::Tag::media>().avDataId = static_cast<int64_t>(dataId); auto event = DemuxFilterEvent::make<DemuxFilterEvent::Tag::media>(); auto& mediaEvent = event.get<DemuxFilterEvent::Tag::media>(); mediaEvent.avMemory = ::android::dupToAidl(nativeHandle); mediaEvent.dataLength = static_cast<int64_t>(output.size()); mediaEvent.avDataId = static_cast<int64_t>(dataId); if (mPts) { mFilterEvents[size].get<DemuxFilterEvent::Tag::media>().pts = mPts; mediaEvent.pts = mPts; mPts = 0; } mFilterEvents.push_back(std::move(event)); // Clear and log native_handle_close(nativeHandle); Loading Loading @@ -931,18 +1048,17 @@ native_handle_t* Filter::createNativeHandle(int fd) { } // Create mediaEvent and send callback int size = mFilterEvents.size(); mFilterEvents.resize(size + 1); mFilterEvents[size] = DemuxFilterEvent::make<DemuxFilterEvent::Tag::media>(); mFilterEvents[size].get<DemuxFilterEvent::Tag::media>().avMemory = ::android::dupToAidl(nativeHandle); mFilterEvents[size].get<DemuxFilterEvent::Tag::media>().offset = mSharedAvMemOffset; mFilterEvents[size].get<DemuxFilterEvent::Tag::media>().dataLength = static_cast<int64_t>(output.size()); auto event = DemuxFilterEvent::make<DemuxFilterEvent::Tag::media>(); auto& mediaEvent = event.get<DemuxFilterEvent::Tag::media>(); mediaEvent.avMemory = ::android::dupToAidl(nativeHandle); mediaEvent.offset = mSharedAvMemOffset; mediaEvent.dataLength = static_cast<int64_t>(output.size()); if (mPts) { mFilterEvents[size].get<DemuxFilterEvent::Tag::media>().pts = mPts; mediaEvent.pts = mPts; mPts = 0; } mFilterEvents.push_back(std::move(event)); mSharedAvMemOffset += output.size(); // Clear and log Loading tv/tuner/aidl/default/Filter.h +46 −6 Original line number Diff line number Diff line Loading @@ -18,6 +18,8 @@ #include <aidl/android/hardware/tv/tuner/BnFilter.h> #include <aidl/android/hardware/tv/tuner/Constant.h> #include <aidl/android/hardware/tv/tuner/DemuxFilterEvent.h> #include <aidl/android/hardware/tv/tuner/DemuxFilterStatus.h> #include <fmq/AidlMessageQueue.h> #include <inttypes.h> Loading @@ -25,6 +27,7 @@ #include <math.h> #include <sys/stat.h> #include <atomic> #include <condition_variable> #include <set> #include <thread> Loading Loading @@ -52,10 +55,49 @@ const uint32_t BUFFER_SIZE_16M = 0x1000000; class Demux; class Dvr; class Filter : public BnFilter { class FilterCallbackScheduler final { public: Filter(); FilterCallbackScheduler(const std::shared_ptr<IFilterCallback>& cb); ~FilterCallbackScheduler(); void onFilterEvent(DemuxFilterEvent&& event); void onFilterStatus(const DemuxFilterStatus& status); void setTimeDelayHint(int timeDelay); void setDataSizeDelayHint(int dataSizeDelay); bool hasCallbackRegistered() const; private: void start(); void stop(); void threadLoop(); void threadLoopOnce(); static int getDemuxFilterEventDataLength(const DemuxFilterEvent& event); private: std::shared_ptr<IFilterCallback> mCallback; std::thread mCallbackThread; std::atomic<bool> mIsRunning; // mLock protects mCallbackBuffer, mCv, and mDataLength std::mutex mLock; std::vector<DemuxFilterEvent> mCallbackBuffer; std::condition_variable mCv; int mDataLength; // both of these need to be atomic (not just mTimeDelayInMs) as this class // needs to be threadsafe. std::atomic<int> mTimeDelayInMs; std::atomic<int> mDataSizeDelayInBytes; }; class Filter : public BnFilter { friend class FilterCallbackScheduler; public: Filter(DemuxFilterType type, int64_t filterId, uint32_t bufferSize, const std::shared_ptr<IFilterCallback>& cb, std::shared_ptr<Demux> demux); Loading Loading @@ -104,10 +146,8 @@ class Filter : public BnFilter { std::shared_ptr<Demux> mDemux; // Dvr reference once the filter is attached to any std::shared_ptr<Dvr> mDvr = nullptr; /** * Filter callbacks used on filter events or FMQ status */ std::shared_ptr<IFilterCallback> mCallback = nullptr; FilterCallbackScheduler mCallbackScheduler; int64_t mFilterId; int32_t mCid = static_cast<int32_t>(Constant::INVALID_IP_FILTER_CONTEXT_ID); Loading tv/tuner/aidl/vts/functional/FilterTests.h +0 −2 Original line number Diff line number Diff line Loading @@ -74,7 +74,6 @@ class FilterCallback : public BnFilterCallback { void setFilterId(int32_t filterId) { mFilterId = filterId; } void setFilterInterface(std::shared_ptr<IFilter> filter) { mFilter = filter; } void setFilterEventType(FilterEventType type) { mFilterEventType = type; } void setSharedHandle(native_handle_t* sharedHandle) { mAvSharedHandle = sharedHandle; } void setMemSize(uint64_t size) { mAvSharedMemSize = size; } Loading @@ -89,7 +88,6 @@ class FilterCallback : public BnFilterCallback { private: int32_t mFilterId; std::shared_ptr<IFilter> mFilter; FilterEventType mFilterEventType; native_handle_t* mAvSharedHandle = nullptr; uint64_t mAvSharedMemSize = -1; Loading Loading
tv/tuner/aidl/default/Filter.cpp +196 −80 Original line number Diff line number Diff line Loading @@ -34,16 +34,132 @@ namespace tuner { #define WAIT_TIMEOUT 3000000000 Filter::Filter() {} FilterCallbackScheduler::FilterCallbackScheduler(const std::shared_ptr<IFilterCallback>& cb) : mCallback(cb), mDataLength(0), mTimeDelayInMs(0), mDataSizeDelayInBytes(0) { start(); } Filter::Filter(DemuxFilterType type, int64_t filterId, uint32_t bufferSize, const std::shared_ptr<IFilterCallback>& cb, std::shared_ptr<Demux> demux) { mType = type; mFilterId = filterId; mBufferSize = bufferSize; mDemux = demux; mCallback = cb; FilterCallbackScheduler::~FilterCallbackScheduler() { stop(); } void FilterCallbackScheduler::onFilterEvent(DemuxFilterEvent&& event) { std::lock_guard<std::mutex> lock(mLock); mCallbackBuffer.push_back(std::move(event)); mDataLength += getDemuxFilterEventDataLength(event); if (mDataLength >= mDataSizeDelayInBytes) { // size limit has been reached, send out events mCv.notify_all(); } } void FilterCallbackScheduler::onFilterStatus(const DemuxFilterStatus& status) { if (mCallback) { mCallback->onFilterStatus(status); } } void FilterCallbackScheduler::setTimeDelayHint(int timeDelay) { // updating the setTimeDelay does not go into effect until the condition // variable times out or is notified. // One possibility is to notify the condition variable right away when the // time delay changes, but I don't see the benefit over waiting for the next // timeout / push, since -- in any case -- this will not change very often. mTimeDelayInMs = timeDelay; } void FilterCallbackScheduler::setDataSizeDelayHint(int dataSizeDelay) { // similar to updating the time delay hint, when mDataSizeDelayInBytes // changes, this will not go into immediate effect, but will wait until the // next filterEvent. // We could technically check the current data length and notify the // condition variable if we wanted to, but again, this may be overkill. mDataSizeDelayInBytes = dataSizeDelay; } bool FilterCallbackScheduler::hasCallbackRegistered() const { return mCallback != nullptr; } void FilterCallbackScheduler::start() { mIsRunning = true; mCallbackThread = std::thread(&FilterCallbackScheduler::threadLoop, this); } void FilterCallbackScheduler::stop() { mIsRunning = false; if (mCallbackThread.joinable()) { mCallbackThread.join(); } } void FilterCallbackScheduler::threadLoop() { while (mIsRunning) { threadLoopOnce(); } } void FilterCallbackScheduler::threadLoopOnce() { std::unique_lock<std::mutex> lock(mLock); // mTimeDelayInMs is an atomic value, so it should be copied into a local // variable before use (to make sure both the if statement and wait_for use // the same value). int timeDelayInMs = mTimeDelayInMs; if (timeDelayInMs > 0) { mCv.wait_for(lock, std::chrono::milliseconds(timeDelayInMs)); } else { // no reason to timeout, just wait until main thread determines it's // okay to send data. mCv.wait(lock); } // condition_variable wait locks mutex on timeout / notify if (!mCallbackBuffer.empty()) { if (mCallback) { mCallback->onFilterEvent(mCallbackBuffer); } mCallbackBuffer.clear(); mDataLength = 0; } lock.unlock(); } int FilterCallbackScheduler::getDemuxFilterEventDataLength(const DemuxFilterEvent& event) { // there is a risk that dataLength could be a negative value, but it // *should* be safe to assume that it is always positive. switch (event.getTag()) { case DemuxFilterEvent::Tag::section: return event.get<DemuxFilterEvent::Tag::section>().dataLength; case DemuxFilterEvent::Tag::media: return event.get<DemuxFilterEvent::Tag::media>().dataLength; case DemuxFilterEvent::Tag::pes: return event.get<DemuxFilterEvent::Tag::pes>().dataLength; case DemuxFilterEvent::Tag::download: return event.get<DemuxFilterEvent::Tag::download>().dataLength; case DemuxFilterEvent::Tag::ipPayload: return event.get<DemuxFilterEvent::Tag::ipPayload>().dataLength; case DemuxFilterEvent::Tag::tsRecord: case DemuxFilterEvent::Tag::mmtpRecord: case DemuxFilterEvent::Tag::temi: case DemuxFilterEvent::Tag::monitorEvent: case DemuxFilterEvent::Tag::startId: // these events do not include a payload and should therefore return // 0. // do not add a default option, so this will not compile when new types // are added. return 0; } } Filter::Filter(DemuxFilterType type, int64_t filterId, uint32_t bufferSize, const std::shared_ptr<IFilterCallback>& cb, std::shared_ptr<Demux> demux) : mDemux(demux), mCallbackScheduler(cb), mFilterId(filterId), mBufferSize(bufferSize), mType(type) { switch (mType.mainType) { case DemuxFilterMainType::TS: if (mType.subType.get<DemuxFilterSubType::Tag::tsFilterType>() == Loading Loading @@ -112,9 +228,29 @@ Filter::~Filter() { } ::ndk::ScopedAStatus Filter::setDelayHint(const FilterDelayHint& in_hint) { if (mIsMediaFilter) { // delay hint is not supported for media filters return ::ndk::ScopedAStatus::fromServiceSpecificError( static_cast<int32_t>(Result::UNAVAILABLE)); } ALOGV("%s", __FUNCTION__); (void)in_hint; // TODO: implement if (in_hint.hintValue < 0) { return ::ndk::ScopedAStatus::fromServiceSpecificError( static_cast<int32_t>(Result::INVALID_ARGUMENT)); } switch (in_hint.hintType) { case FilterDelayHintType::TIME_DELAY_IN_MS: mCallbackScheduler.setTimeDelayHint(in_hint.hintValue); break; case FilterDelayHintType::DATA_SIZE_DELAY_IN_BYTES: mCallbackScheduler.setDataSizeDelayHint(in_hint.hintValue); break; default: return ::ndk::ScopedAStatus::fromServiceSpecificError( static_cast<int32_t>(Result::INVALID_ARGUMENT)); } return ::ndk::ScopedAStatus::ok(); } Loading Loading @@ -155,40 +291,36 @@ Filter::~Filter() { ::ndk::ScopedAStatus Filter::start() { ALOGV("%s", __FUNCTION__); mFilterThreadRunning = true; vector<DemuxFilterEvent> events; std::vector<DemuxFilterEvent> events; // All the filter event callbacks in start are for testing purpose. switch (mType.mainType) { case DemuxFilterMainType::TS: createMediaEvent(events); mCallback->onFilterEvent(events); createTsRecordEvent(events); mCallback->onFilterEvent(events); createTemiEvent(events); mCallback->onFilterEvent(events); break; case DemuxFilterMainType::MMTP: createDownloadEvent(events); mCallback->onFilterEvent(events); createMmtpRecordEvent(events); mCallback->onFilterEvent(events); break; case DemuxFilterMainType::IP: createSectionEvent(events); mCallback->onFilterEvent(events); createIpPayloadEvent(events); mCallback->onFilterEvent(events); break; case DemuxFilterMainType::TLV: createMonitorEvent(events); mCallback->onFilterEvent(events); break; case DemuxFilterMainType::ALP: createMonitorEvent(events); mCallback->onFilterEvent(events); break; default: break; } for (auto&& event : events) { mCallbackScheduler.onFilterEvent(std::move(event)); } return startFilterLoop(); } Loading Loading @@ -327,15 +459,14 @@ Filter::~Filter() { if (newScramblingStatus ^ mScramblingStatusMonitored) { mScramblingStatusMonitored = newScramblingStatus; if (mScramblingStatusMonitored) { if (mCallback != nullptr) { if (mCallbackScheduler.hasCallbackRegistered()) { // Assuming current status is always NOT_SCRAMBLED vector<DemuxFilterEvent> events; DemuxFilterMonitorEvent monitorEvent; events.resize(1); monitorEvent.set<DemuxFilterMonitorEvent::Tag::scramblingStatus>( auto monitorEvent = DemuxFilterMonitorEvent::make< DemuxFilterMonitorEvent::Tag::scramblingStatus>( ScramblingStatus::NOT_SCRAMBLED); events[0].set<DemuxFilterEvent::monitorEvent>(monitorEvent); mCallback->onFilterEvent(events); auto event = DemuxFilterEvent::make<DemuxFilterEvent::Tag::monitorEvent>(monitorEvent); mCallbackScheduler.onFilterEvent(std::move(event)); } else { return ::ndk::ScopedAStatus::fromServiceSpecificError( static_cast<int32_t>(Result::INVALID_STATE)); Loading @@ -347,14 +478,13 @@ Filter::~Filter() { if (newIpCid ^ mIpCidMonitored) { mIpCidMonitored = newIpCid; if (mIpCidMonitored) { if (mCallback != nullptr) { if (mCallbackScheduler.hasCallbackRegistered()) { // Return random cid vector<DemuxFilterEvent> events; DemuxFilterMonitorEvent monitorEvent; events.resize(1); monitorEvent.set<DemuxFilterMonitorEvent::Tag::cid>(1); events[0].set<DemuxFilterEvent::monitorEvent>(monitorEvent); mCallback->onFilterEvent(events); auto monitorEvent = DemuxFilterMonitorEvent::make<DemuxFilterMonitorEvent::Tag::cid>(1); auto event = DemuxFilterEvent::make<DemuxFilterEvent::Tag::monitorEvent>(monitorEvent); mCallbackScheduler.onFilterEvent(std::move(event)); } else { return ::ndk::ScopedAStatus::fromServiceSpecificError( static_cast<int32_t>(Result::INVALID_STATE)); Loading Loading @@ -410,26 +540,26 @@ void Filter::filterThreadLoop() { } // After successfully write, send a callback and wait for the read to be done if (mCallback != nullptr) { if (mCallbackScheduler.hasCallbackRegistered()) { if (mConfigured) { vector<DemuxFilterEvent> startEvent; startEvent.resize(1); startEvent[0].set<DemuxFilterEvent::Tag::startId>(mStartId++); mCallback->onFilterEvent(startEvent); auto startEvent = DemuxFilterEvent::make<DemuxFilterEvent::Tag::startId>(mStartId++); mCallbackScheduler.onFilterEvent(std::move(startEvent)); mConfigured = false; } mCallback->onFilterEvent(mFilterEvents); for (auto&& event : mFilterEvents) { mCallbackScheduler.onFilterEvent(std::move(event)); } } else { ALOGD("[Filter] filter callback is not configured yet."); mFilterThreadRunning = false; return; } mFilterEvents.resize(0); mFilterEvents.clear(); mFilterStatus = DemuxFilterStatus::DATA_READY; if (mCallback != nullptr) { mCallback->onFilterStatus(mFilterStatus); } mCallbackScheduler.onFilterStatus(mFilterStatus); break; } Loading Loading @@ -460,10 +590,10 @@ void Filter::filterThreadLoop() { continue; } // After successfully write, send a callback and wait for the read to be done if (mCallback != nullptr) { mCallback->onFilterEvent(mFilterEvents); for (auto&& event : mFilterEvents) { mCallbackScheduler.onFilterEvent(std::move(event)); } mFilterEvents.resize(0); mFilterEvents.clear(); break; } // We do not wait for the last read to be done Loading Loading @@ -499,9 +629,7 @@ void Filter::maySendFilterStatusCallback() { DemuxFilterStatus newStatus = checkFilterStatusChange( availableToWrite, availableToRead, ceil(fmqSize * 0.75), ceil(fmqSize * 0.25)); if (mFilterStatus != newStatus) { if (mCallback != nullptr) { mCallback->onFilterStatus(newStatus); } mCallbackScheduler.onFilterStatus(newStatus); mFilterStatus = newStatus; } } Loading Loading @@ -657,9 +785,7 @@ void Filter::updateRecordOutput(vector<int8_t>& data) { ALOGD("[Filter] assembled pes data length %d", pesEvent.dataLength); } int size = mFilterEvents.size(); mFilterEvents.resize(size + 1); mFilterEvents[size].set<DemuxFilterEvent::Tag::pes>(pesEvent); mFilterEvents.push_back(DemuxFilterEvent::make<DemuxFilterEvent::Tag::pes>(pesEvent)); mPesOutput.clear(); } Loading Loading @@ -763,11 +889,7 @@ void Filter::updateRecordOutput(vector<int8_t>& data) { .firstMbInSlice = 0, // random address }; int size; size = mFilterEvents.size(); mFilterEvents.resize(size + 1); mFilterEvents[size].set<DemuxFilterEvent::Tag::tsRecord>(recordEvent); mFilterEvents.push_back(DemuxFilterEvent::make<DemuxFilterEvent::Tag::tsRecord>(recordEvent)); mRecordFilterOutput.clear(); return ::ndk::ScopedAStatus::ok(); } Loading @@ -789,8 +911,6 @@ bool Filter::writeSectionsAndCreateEvent(vector<int8_t>& data) { if (!writeDataToFilterMQ(data)) { return false; } int size = mFilterEvents.size(); mFilterEvents.resize(size + 1); DemuxFilterSectionEvent secEvent; secEvent = { // temp dump meta data Loading @@ -799,7 +919,7 @@ bool Filter::writeSectionsAndCreateEvent(vector<int8_t>& data) { .sectionNum = 1, .dataLength = static_cast<int32_t>(data.size()), }; mFilterEvents[size].set<DemuxFilterEvent::Tag::section>(secEvent); mFilterEvents.push_back(DemuxFilterEvent::make<DemuxFilterEvent::Tag::section>(secEvent)); return true; } Loading Loading @@ -888,19 +1008,16 @@ native_handle_t* Filter::createNativeHandle(int fd) { mDataId2Avfd[dataId] = dup(av_fd); // Create mediaEvent and send callback int size = mFilterEvents.size(); mFilterEvents.resize(size + 1); mFilterEvents[size] = DemuxFilterEvent::make<DemuxFilterEvent::Tag::media>(); mFilterEvents[size].get<DemuxFilterEvent::Tag::media>().avMemory = ::android::dupToAidl(nativeHandle); mFilterEvents[size].get<DemuxFilterEvent::Tag::media>().dataLength = static_cast<int64_t>(output.size()); mFilterEvents[size].get<DemuxFilterEvent::Tag::media>().avDataId = static_cast<int64_t>(dataId); auto event = DemuxFilterEvent::make<DemuxFilterEvent::Tag::media>(); auto& mediaEvent = event.get<DemuxFilterEvent::Tag::media>(); mediaEvent.avMemory = ::android::dupToAidl(nativeHandle); mediaEvent.dataLength = static_cast<int64_t>(output.size()); mediaEvent.avDataId = static_cast<int64_t>(dataId); if (mPts) { mFilterEvents[size].get<DemuxFilterEvent::Tag::media>().pts = mPts; mediaEvent.pts = mPts; mPts = 0; } mFilterEvents.push_back(std::move(event)); // Clear and log native_handle_close(nativeHandle); Loading Loading @@ -931,18 +1048,17 @@ native_handle_t* Filter::createNativeHandle(int fd) { } // Create mediaEvent and send callback int size = mFilterEvents.size(); mFilterEvents.resize(size + 1); mFilterEvents[size] = DemuxFilterEvent::make<DemuxFilterEvent::Tag::media>(); mFilterEvents[size].get<DemuxFilterEvent::Tag::media>().avMemory = ::android::dupToAidl(nativeHandle); mFilterEvents[size].get<DemuxFilterEvent::Tag::media>().offset = mSharedAvMemOffset; mFilterEvents[size].get<DemuxFilterEvent::Tag::media>().dataLength = static_cast<int64_t>(output.size()); auto event = DemuxFilterEvent::make<DemuxFilterEvent::Tag::media>(); auto& mediaEvent = event.get<DemuxFilterEvent::Tag::media>(); mediaEvent.avMemory = ::android::dupToAidl(nativeHandle); mediaEvent.offset = mSharedAvMemOffset; mediaEvent.dataLength = static_cast<int64_t>(output.size()); if (mPts) { mFilterEvents[size].get<DemuxFilterEvent::Tag::media>().pts = mPts; mediaEvent.pts = mPts; mPts = 0; } mFilterEvents.push_back(std::move(event)); mSharedAvMemOffset += output.size(); // Clear and log Loading
tv/tuner/aidl/default/Filter.h +46 −6 Original line number Diff line number Diff line Loading @@ -18,6 +18,8 @@ #include <aidl/android/hardware/tv/tuner/BnFilter.h> #include <aidl/android/hardware/tv/tuner/Constant.h> #include <aidl/android/hardware/tv/tuner/DemuxFilterEvent.h> #include <aidl/android/hardware/tv/tuner/DemuxFilterStatus.h> #include <fmq/AidlMessageQueue.h> #include <inttypes.h> Loading @@ -25,6 +27,7 @@ #include <math.h> #include <sys/stat.h> #include <atomic> #include <condition_variable> #include <set> #include <thread> Loading Loading @@ -52,10 +55,49 @@ const uint32_t BUFFER_SIZE_16M = 0x1000000; class Demux; class Dvr; class Filter : public BnFilter { class FilterCallbackScheduler final { public: Filter(); FilterCallbackScheduler(const std::shared_ptr<IFilterCallback>& cb); ~FilterCallbackScheduler(); void onFilterEvent(DemuxFilterEvent&& event); void onFilterStatus(const DemuxFilterStatus& status); void setTimeDelayHint(int timeDelay); void setDataSizeDelayHint(int dataSizeDelay); bool hasCallbackRegistered() const; private: void start(); void stop(); void threadLoop(); void threadLoopOnce(); static int getDemuxFilterEventDataLength(const DemuxFilterEvent& event); private: std::shared_ptr<IFilterCallback> mCallback; std::thread mCallbackThread; std::atomic<bool> mIsRunning; // mLock protects mCallbackBuffer, mCv, and mDataLength std::mutex mLock; std::vector<DemuxFilterEvent> mCallbackBuffer; std::condition_variable mCv; int mDataLength; // both of these need to be atomic (not just mTimeDelayInMs) as this class // needs to be threadsafe. std::atomic<int> mTimeDelayInMs; std::atomic<int> mDataSizeDelayInBytes; }; class Filter : public BnFilter { friend class FilterCallbackScheduler; public: Filter(DemuxFilterType type, int64_t filterId, uint32_t bufferSize, const std::shared_ptr<IFilterCallback>& cb, std::shared_ptr<Demux> demux); Loading Loading @@ -104,10 +146,8 @@ class Filter : public BnFilter { std::shared_ptr<Demux> mDemux; // Dvr reference once the filter is attached to any std::shared_ptr<Dvr> mDvr = nullptr; /** * Filter callbacks used on filter events or FMQ status */ std::shared_ptr<IFilterCallback> mCallback = nullptr; FilterCallbackScheduler mCallbackScheduler; int64_t mFilterId; int32_t mCid = static_cast<int32_t>(Constant::INVALID_IP_FILTER_CONTEXT_ID); Loading
tv/tuner/aidl/vts/functional/FilterTests.h +0 −2 Original line number Diff line number Diff line Loading @@ -74,7 +74,6 @@ class FilterCallback : public BnFilterCallback { void setFilterId(int32_t filterId) { mFilterId = filterId; } void setFilterInterface(std::shared_ptr<IFilter> filter) { mFilter = filter; } void setFilterEventType(FilterEventType type) { mFilterEventType = type; } void setSharedHandle(native_handle_t* sharedHandle) { mAvSharedHandle = sharedHandle; } void setMemSize(uint64_t size) { mAvSharedMemSize = size; } Loading @@ -89,7 +88,6 @@ class FilterCallback : public BnFilterCallback { private: int32_t mFilterId; std::shared_ptr<IFilter> mFilter; FilterEventType mFilterEventType; native_handle_t* mAvSharedHandle = nullptr; uint64_t mAvSharedMemSize = -1; Loading