Donate to e Foundation | Murena handsets with /e/OS | Own a part of Murena! Learn more

Commit 1ac8d4a6 authored by Patrick Rohr's avatar Patrick Rohr
Browse files

add filter delay hint default implementation

Adds a event buffering / scheduling mechanism that is configured using
the delay hint.

Bug: 183057734
Test: atest VtsHalTvTunerTargetTest
Change-Id: I154eb05bc419f827008161f85a6304a8599dc399
parent 6a4e929c
Loading
Loading
Loading
Loading
+196 −80
Original line number Diff line number Diff line
@@ -34,16 +34,132 @@ namespace tuner {

#define WAIT_TIMEOUT 3000000000

Filter::Filter() {}
FilterCallbackScheduler::FilterCallbackScheduler(const std::shared_ptr<IFilterCallback>& cb)
    : mCallback(cb), mDataLength(0), mTimeDelayInMs(0), mDataSizeDelayInBytes(0) {
    start();
}

Filter::Filter(DemuxFilterType type, int64_t filterId, uint32_t bufferSize,
               const std::shared_ptr<IFilterCallback>& cb, std::shared_ptr<Demux> demux) {
    mType = type;
    mFilterId = filterId;
    mBufferSize = bufferSize;
    mDemux = demux;
    mCallback = cb;
FilterCallbackScheduler::~FilterCallbackScheduler() {
    stop();
}

void FilterCallbackScheduler::onFilterEvent(DemuxFilterEvent&& event) {
    std::lock_guard<std::mutex> lock(mLock);
    mCallbackBuffer.push_back(std::move(event));
    mDataLength += getDemuxFilterEventDataLength(event);

    if (mDataLength >= mDataSizeDelayInBytes) {
        // size limit has been reached, send out events
        mCv.notify_all();
    }
}

void FilterCallbackScheduler::onFilterStatus(const DemuxFilterStatus& status) {
    if (mCallback) {
        mCallback->onFilterStatus(status);
    }
}

void FilterCallbackScheduler::setTimeDelayHint(int timeDelay) {
    // updating the setTimeDelay does not go into effect until the condition
    // variable times out or is notified.
    // One possibility is to notify the condition variable right away when the
    // time delay changes, but I don't see the benefit over waiting for the next
    // timeout / push, since -- in any case -- this will not change very often.
    mTimeDelayInMs = timeDelay;
}

void FilterCallbackScheduler::setDataSizeDelayHint(int dataSizeDelay) {
    // similar to updating the time delay hint, when mDataSizeDelayInBytes
    // changes, this will not go into immediate effect, but will wait until the
    // next filterEvent.
    // We could technically check the current data length and notify the
    // condition variable if we wanted to, but again, this may be overkill.
    mDataSizeDelayInBytes = dataSizeDelay;
}

bool FilterCallbackScheduler::hasCallbackRegistered() const {
    return mCallback != nullptr;
}

void FilterCallbackScheduler::start() {
    mIsRunning = true;
    mCallbackThread = std::thread(&FilterCallbackScheduler::threadLoop, this);
}

void FilterCallbackScheduler::stop() {
    mIsRunning = false;
    if (mCallbackThread.joinable()) {
        mCallbackThread.join();
    }
}

void FilterCallbackScheduler::threadLoop() {
    while (mIsRunning) {
        threadLoopOnce();
    }
}

void FilterCallbackScheduler::threadLoopOnce() {
    std::unique_lock<std::mutex> lock(mLock);
    // mTimeDelayInMs is an atomic value, so it should be copied into a local
    // variable before use (to make sure both the if statement and wait_for use
    // the same value).
    int timeDelayInMs = mTimeDelayInMs;
    if (timeDelayInMs > 0) {
        mCv.wait_for(lock, std::chrono::milliseconds(timeDelayInMs));
    } else {
        // no reason to timeout, just wait until main thread determines it's
        // okay to send data.
        mCv.wait(lock);
    }

    // condition_variable wait locks mutex on timeout / notify
    if (!mCallbackBuffer.empty()) {
        if (mCallback) {
            mCallback->onFilterEvent(mCallbackBuffer);
        }
        mCallbackBuffer.clear();
        mDataLength = 0;
    }
    lock.unlock();
}

int FilterCallbackScheduler::getDemuxFilterEventDataLength(const DemuxFilterEvent& event) {
    // there is a risk that dataLength could be a negative value, but it
    // *should* be safe to assume that it is always positive.
    switch (event.getTag()) {
        case DemuxFilterEvent::Tag::section:
            return event.get<DemuxFilterEvent::Tag::section>().dataLength;
        case DemuxFilterEvent::Tag::media:
            return event.get<DemuxFilterEvent::Tag::media>().dataLength;
        case DemuxFilterEvent::Tag::pes:
            return event.get<DemuxFilterEvent::Tag::pes>().dataLength;
        case DemuxFilterEvent::Tag::download:
            return event.get<DemuxFilterEvent::Tag::download>().dataLength;
        case DemuxFilterEvent::Tag::ipPayload:
            return event.get<DemuxFilterEvent::Tag::ipPayload>().dataLength;

        case DemuxFilterEvent::Tag::tsRecord:
        case DemuxFilterEvent::Tag::mmtpRecord:
        case DemuxFilterEvent::Tag::temi:
        case DemuxFilterEvent::Tag::monitorEvent:
        case DemuxFilterEvent::Tag::startId:
            // these events do not include a payload and should therefore return
            // 0.
            // do not add a default option, so this will not compile when new types
            // are added.
            return 0;
    }
}

Filter::Filter(DemuxFilterType type, int64_t filterId, uint32_t bufferSize,
               const std::shared_ptr<IFilterCallback>& cb, std::shared_ptr<Demux> demux)
    : mDemux(demux),
      mCallbackScheduler(cb),
      mFilterId(filterId),
      mBufferSize(bufferSize),
      mType(type) {
    switch (mType.mainType) {
        case DemuxFilterMainType::TS:
            if (mType.subType.get<DemuxFilterSubType::Tag::tsFilterType>() ==
@@ -112,9 +228,29 @@ Filter::~Filter() {
}

::ndk::ScopedAStatus Filter::setDelayHint(const FilterDelayHint& in_hint) {
    if (mIsMediaFilter) {
        // delay hint is not supported for media filters
        return ::ndk::ScopedAStatus::fromServiceSpecificError(
                static_cast<int32_t>(Result::UNAVAILABLE));
    }

    ALOGV("%s", __FUNCTION__);
    (void)in_hint;
    // TODO: implement
    if (in_hint.hintValue < 0) {
        return ::ndk::ScopedAStatus::fromServiceSpecificError(
                static_cast<int32_t>(Result::INVALID_ARGUMENT));
    }

    switch (in_hint.hintType) {
        case FilterDelayHintType::TIME_DELAY_IN_MS:
            mCallbackScheduler.setTimeDelayHint(in_hint.hintValue);
            break;
        case FilterDelayHintType::DATA_SIZE_DELAY_IN_BYTES:
            mCallbackScheduler.setDataSizeDelayHint(in_hint.hintValue);
            break;
        default:
            return ::ndk::ScopedAStatus::fromServiceSpecificError(
                    static_cast<int32_t>(Result::INVALID_ARGUMENT));
    }

    return ::ndk::ScopedAStatus::ok();
}
@@ -155,40 +291,36 @@ Filter::~Filter() {
::ndk::ScopedAStatus Filter::start() {
    ALOGV("%s", __FUNCTION__);
    mFilterThreadRunning = true;
    vector<DemuxFilterEvent> events;
    std::vector<DemuxFilterEvent> events;
    // All the filter event callbacks in start are for testing purpose.
    switch (mType.mainType) {
        case DemuxFilterMainType::TS:
            createMediaEvent(events);
            mCallback->onFilterEvent(events);
            createTsRecordEvent(events);
            mCallback->onFilterEvent(events);
            createTemiEvent(events);
            mCallback->onFilterEvent(events);
            break;
        case DemuxFilterMainType::MMTP:
            createDownloadEvent(events);
            mCallback->onFilterEvent(events);
            createMmtpRecordEvent(events);
            mCallback->onFilterEvent(events);
            break;
        case DemuxFilterMainType::IP:
            createSectionEvent(events);
            mCallback->onFilterEvent(events);
            createIpPayloadEvent(events);
            mCallback->onFilterEvent(events);
            break;
        case DemuxFilterMainType::TLV:
            createMonitorEvent(events);
            mCallback->onFilterEvent(events);
            break;
        case DemuxFilterMainType::ALP:
            createMonitorEvent(events);
            mCallback->onFilterEvent(events);
            break;
        default:
            break;
    }

    for (auto&& event : events) {
        mCallbackScheduler.onFilterEvent(std::move(event));
    }

    return startFilterLoop();
}

@@ -327,15 +459,14 @@ Filter::~Filter() {
    if (newScramblingStatus ^ mScramblingStatusMonitored) {
        mScramblingStatusMonitored = newScramblingStatus;
        if (mScramblingStatusMonitored) {
            if (mCallback != nullptr) {
            if (mCallbackScheduler.hasCallbackRegistered()) {
                // Assuming current status is always NOT_SCRAMBLED
                vector<DemuxFilterEvent> events;
                DemuxFilterMonitorEvent monitorEvent;
                events.resize(1);
                monitorEvent.set<DemuxFilterMonitorEvent::Tag::scramblingStatus>(
                auto monitorEvent = DemuxFilterMonitorEvent::make<
                        DemuxFilterMonitorEvent::Tag::scramblingStatus>(
                        ScramblingStatus::NOT_SCRAMBLED);
                events[0].set<DemuxFilterEvent::monitorEvent>(monitorEvent);
                mCallback->onFilterEvent(events);
                auto event =
                        DemuxFilterEvent::make<DemuxFilterEvent::Tag::monitorEvent>(monitorEvent);
                mCallbackScheduler.onFilterEvent(std::move(event));
            } else {
                return ::ndk::ScopedAStatus::fromServiceSpecificError(
                        static_cast<int32_t>(Result::INVALID_STATE));
@@ -347,14 +478,13 @@ Filter::~Filter() {
    if (newIpCid ^ mIpCidMonitored) {
        mIpCidMonitored = newIpCid;
        if (mIpCidMonitored) {
            if (mCallback != nullptr) {
            if (mCallbackScheduler.hasCallbackRegistered()) {
                // Return random cid
                vector<DemuxFilterEvent> events;
                DemuxFilterMonitorEvent monitorEvent;
                events.resize(1);
                monitorEvent.set<DemuxFilterMonitorEvent::Tag::cid>(1);
                events[0].set<DemuxFilterEvent::monitorEvent>(monitorEvent);
                mCallback->onFilterEvent(events);
                auto monitorEvent =
                        DemuxFilterMonitorEvent::make<DemuxFilterMonitorEvent::Tag::cid>(1);
                auto event =
                        DemuxFilterEvent::make<DemuxFilterEvent::Tag::monitorEvent>(monitorEvent);
                mCallbackScheduler.onFilterEvent(std::move(event));
            } else {
                return ::ndk::ScopedAStatus::fromServiceSpecificError(
                        static_cast<int32_t>(Result::INVALID_STATE));
@@ -410,26 +540,26 @@ void Filter::filterThreadLoop() {
        }

        // After successfully write, send a callback and wait for the read to be done
        if (mCallback != nullptr) {
        if (mCallbackScheduler.hasCallbackRegistered()) {
            if (mConfigured) {
                vector<DemuxFilterEvent> startEvent;
                startEvent.resize(1);
                startEvent[0].set<DemuxFilterEvent::Tag::startId>(mStartId++);
                mCallback->onFilterEvent(startEvent);
                auto startEvent =
                        DemuxFilterEvent::make<DemuxFilterEvent::Tag::startId>(mStartId++);
                mCallbackScheduler.onFilterEvent(std::move(startEvent));
                mConfigured = false;
            }
            mCallback->onFilterEvent(mFilterEvents);

            for (auto&& event : mFilterEvents) {
                mCallbackScheduler.onFilterEvent(std::move(event));
            }
        } else {
            ALOGD("[Filter] filter callback is not configured yet.");
            mFilterThreadRunning = false;
            return;
        }

        mFilterEvents.resize(0);
        mFilterEvents.clear();
        mFilterStatus = DemuxFilterStatus::DATA_READY;
        if (mCallback != nullptr) {
            mCallback->onFilterStatus(mFilterStatus);
        }
        mCallbackScheduler.onFilterStatus(mFilterStatus);
        break;
    }

@@ -460,10 +590,10 @@ void Filter::filterThreadLoop() {
                    continue;
                }
                // After successfully write, send a callback and wait for the read to be done
                if (mCallback != nullptr) {
                    mCallback->onFilterEvent(mFilterEvents);
                for (auto&& event : mFilterEvents) {
                    mCallbackScheduler.onFilterEvent(std::move(event));
                }
                mFilterEvents.resize(0);
                mFilterEvents.clear();
                break;
            }
            // We do not wait for the last read to be done
@@ -499,9 +629,7 @@ void Filter::maySendFilterStatusCallback() {
    DemuxFilterStatus newStatus = checkFilterStatusChange(
            availableToWrite, availableToRead, ceil(fmqSize * 0.75), ceil(fmqSize * 0.25));
    if (mFilterStatus != newStatus) {
        if (mCallback != nullptr) {
            mCallback->onFilterStatus(newStatus);
        }
        mCallbackScheduler.onFilterStatus(newStatus);
        mFilterStatus = newStatus;
    }
}
@@ -657,9 +785,7 @@ void Filter::updateRecordOutput(vector<int8_t>& data) {
            ALOGD("[Filter] assembled pes data length %d", pesEvent.dataLength);
        }

        int size = mFilterEvents.size();
        mFilterEvents.resize(size + 1);
        mFilterEvents[size].set<DemuxFilterEvent::Tag::pes>(pesEvent);
        mFilterEvents.push_back(DemuxFilterEvent::make<DemuxFilterEvent::Tag::pes>(pesEvent));
        mPesOutput.clear();
    }

@@ -763,11 +889,7 @@ void Filter::updateRecordOutput(vector<int8_t>& data) {
            .firstMbInSlice = 0,  // random address
    };

    int size;
    size = mFilterEvents.size();
    mFilterEvents.resize(size + 1);
    mFilterEvents[size].set<DemuxFilterEvent::Tag::tsRecord>(recordEvent);

    mFilterEvents.push_back(DemuxFilterEvent::make<DemuxFilterEvent::Tag::tsRecord>(recordEvent));
    mRecordFilterOutput.clear();
    return ::ndk::ScopedAStatus::ok();
}
@@ -789,8 +911,6 @@ bool Filter::writeSectionsAndCreateEvent(vector<int8_t>& data) {
    if (!writeDataToFilterMQ(data)) {
        return false;
    }
    int size = mFilterEvents.size();
    mFilterEvents.resize(size + 1);
    DemuxFilterSectionEvent secEvent;
    secEvent = {
            // temp dump meta data
@@ -799,7 +919,7 @@ bool Filter::writeSectionsAndCreateEvent(vector<int8_t>& data) {
            .sectionNum = 1,
            .dataLength = static_cast<int32_t>(data.size()),
    };
    mFilterEvents[size].set<DemuxFilterEvent::Tag::section>(secEvent);
    mFilterEvents.push_back(DemuxFilterEvent::make<DemuxFilterEvent::Tag::section>(secEvent));
    return true;
}

@@ -888,19 +1008,16 @@ native_handle_t* Filter::createNativeHandle(int fd) {
    mDataId2Avfd[dataId] = dup(av_fd);

    // Create mediaEvent and send callback
    int size = mFilterEvents.size();
    mFilterEvents.resize(size + 1);

    mFilterEvents[size] = DemuxFilterEvent::make<DemuxFilterEvent::Tag::media>();
    mFilterEvents[size].get<DemuxFilterEvent::Tag::media>().avMemory =
            ::android::dupToAidl(nativeHandle);
    mFilterEvents[size].get<DemuxFilterEvent::Tag::media>().dataLength =
            static_cast<int64_t>(output.size());
    mFilterEvents[size].get<DemuxFilterEvent::Tag::media>().avDataId = static_cast<int64_t>(dataId);
    auto event = DemuxFilterEvent::make<DemuxFilterEvent::Tag::media>();
    auto& mediaEvent = event.get<DemuxFilterEvent::Tag::media>();
    mediaEvent.avMemory = ::android::dupToAidl(nativeHandle);
    mediaEvent.dataLength = static_cast<int64_t>(output.size());
    mediaEvent.avDataId = static_cast<int64_t>(dataId);
    if (mPts) {
        mFilterEvents[size].get<DemuxFilterEvent::Tag::media>().pts = mPts;
        mediaEvent.pts = mPts;
        mPts = 0;
    }
    mFilterEvents.push_back(std::move(event));

    // Clear and log
    native_handle_close(nativeHandle);
@@ -931,18 +1048,17 @@ native_handle_t* Filter::createNativeHandle(int fd) {
    }

    // Create mediaEvent and send callback
    int size = mFilterEvents.size();
    mFilterEvents.resize(size + 1);
    mFilterEvents[size] = DemuxFilterEvent::make<DemuxFilterEvent::Tag::media>();
    mFilterEvents[size].get<DemuxFilterEvent::Tag::media>().avMemory =
            ::android::dupToAidl(nativeHandle);
    mFilterEvents[size].get<DemuxFilterEvent::Tag::media>().offset = mSharedAvMemOffset;
    mFilterEvents[size].get<DemuxFilterEvent::Tag::media>().dataLength =
            static_cast<int64_t>(output.size());
    auto event = DemuxFilterEvent::make<DemuxFilterEvent::Tag::media>();
    auto& mediaEvent = event.get<DemuxFilterEvent::Tag::media>();
    mediaEvent.avMemory = ::android::dupToAidl(nativeHandle);
    mediaEvent.offset = mSharedAvMemOffset;
    mediaEvent.dataLength = static_cast<int64_t>(output.size());
    if (mPts) {
        mFilterEvents[size].get<DemuxFilterEvent::Tag::media>().pts = mPts;
        mediaEvent.pts = mPts;
        mPts = 0;
    }
    mFilterEvents.push_back(std::move(event));

    mSharedAvMemOffset += output.size();

    // Clear and log
+46 −6
Original line number Diff line number Diff line
@@ -18,6 +18,8 @@

#include <aidl/android/hardware/tv/tuner/BnFilter.h>
#include <aidl/android/hardware/tv/tuner/Constant.h>
#include <aidl/android/hardware/tv/tuner/DemuxFilterEvent.h>
#include <aidl/android/hardware/tv/tuner/DemuxFilterStatus.h>

#include <fmq/AidlMessageQueue.h>
#include <inttypes.h>
@@ -25,6 +27,7 @@
#include <math.h>
#include <sys/stat.h>
#include <atomic>
#include <condition_variable>
#include <set>
#include <thread>

@@ -52,10 +55,49 @@ const uint32_t BUFFER_SIZE_16M = 0x1000000;
class Demux;
class Dvr;

class Filter : public BnFilter {
class FilterCallbackScheduler final {
  public:
    Filter();
    FilterCallbackScheduler(const std::shared_ptr<IFilterCallback>& cb);
    ~FilterCallbackScheduler();

    void onFilterEvent(DemuxFilterEvent&& event);
    void onFilterStatus(const DemuxFilterStatus& status);

    void setTimeDelayHint(int timeDelay);
    void setDataSizeDelayHint(int dataSizeDelay);

    bool hasCallbackRegistered() const;

  private:
    void start();
    void stop();

    void threadLoop();
    void threadLoopOnce();

    static int getDemuxFilterEventDataLength(const DemuxFilterEvent& event);

  private:
    std::shared_ptr<IFilterCallback> mCallback;
    std::thread mCallbackThread;
    std::atomic<bool> mIsRunning;

    // mLock protects mCallbackBuffer, mCv, and mDataLength
    std::mutex mLock;
    std::vector<DemuxFilterEvent> mCallbackBuffer;
    std::condition_variable mCv;
    int mDataLength;

    // both of these need to be atomic (not just mTimeDelayInMs) as this class
    // needs to be threadsafe.
    std::atomic<int> mTimeDelayInMs;
    std::atomic<int> mDataSizeDelayInBytes;
};

class Filter : public BnFilter {
    friend class FilterCallbackScheduler;

  public:
    Filter(DemuxFilterType type, int64_t filterId, uint32_t bufferSize,
           const std::shared_ptr<IFilterCallback>& cb, std::shared_ptr<Demux> demux);

@@ -104,10 +146,8 @@ class Filter : public BnFilter {
    std::shared_ptr<Demux> mDemux;
    // Dvr reference once the filter is attached to any
    std::shared_ptr<Dvr> mDvr = nullptr;
    /**
     * Filter callbacks used on filter events or FMQ status
     */
    std::shared_ptr<IFilterCallback> mCallback = nullptr;

    FilterCallbackScheduler mCallbackScheduler;

    int64_t mFilterId;
    int32_t mCid = static_cast<int32_t>(Constant::INVALID_IP_FILTER_CONTEXT_ID);