Donate to e Foundation | Murena handsets with /e/OS | Own a part of Murena! Learn more

Commit 56c98294 authored by sadiqsada's avatar sadiqsada
Browse files

Add IPTV default implementation

Frontend::tune(): create a streamer using plugin interface to
read a byte and return LOCKED event if byte is read

Demux::setFrontendDataSource():open a new stream to read data
from the socket and push the data read to DVR FMQ.

Test: atest VtsHalTvTunerTargetTest
Bug: 288170590
Change-Id: Iaf2eae7b4dc9e7d69b1f7b3a367d24f6acdd68be
parent 51cc83a7
Loading
Loading
Loading
Loading
+1 −0
Original line number Diff line number Diff line
@@ -23,6 +23,7 @@ cc_defaults {
        "TimeFilter.cpp",
        "Tuner.cpp",
        "service.cpp",
        "dtv_plugin.cpp",
    ],
    static_libs: [
        "libaidlcommonsupport",
+168 −56
Original line number Diff line number Diff line
@@ -20,7 +20,9 @@
#include <aidl/android/hardware/tv/tuner/DemuxQueueNotifyBits.h>
#include <aidl/android/hardware/tv/tuner/Result.h>

#include <fmq/AidlMessageQueue.h>
#include <utils/Log.h>
#include <thread>
#include "Demux.h"

namespace aidl {
@@ -29,6 +31,15 @@ namespace hardware {
namespace tv {
namespace tuner {

using ::aidl::android::hardware::common::fmq::MQDescriptor;
using ::aidl::android::hardware::common::fmq::SynchronizedReadWrite;
using ::android::AidlMessageQueue;
using ::android::hardware::EventFlag;

using FilterMQ = AidlMessageQueue<int8_t, SynchronizedReadWrite>;
using AidlMQ = AidlMessageQueue<int8_t, SynchronizedReadWrite>;
using AidlMQDesc = MQDescriptor<int8_t, SynchronizedReadWrite>;

#define WAIT_TIMEOUT 3000000000

Demux::Demux(int32_t demuxId, uint32_t filterTypes) {
@@ -45,6 +56,111 @@ Demux::~Demux() {
    close();
}

::ndk::ScopedAStatus Demux::openDvr(DvrType in_type, int32_t in_bufferSize,
                                    const std::shared_ptr<IDvrCallback>& in_cb,
                                    std::shared_ptr<IDvr>* _aidl_return) {
    ALOGV("%s", __FUNCTION__);

    if (in_cb == nullptr) {
        ALOGW("[Demux] DVR callback can't be null");
        *_aidl_return = nullptr;
        return ::ndk::ScopedAStatus::fromServiceSpecificError(
                static_cast<int32_t>(Result::INVALID_ARGUMENT));
    }

    set<int64_t>::iterator it;
    switch (in_type) {
        case DvrType::PLAYBACK:
            mDvrPlayback = ndk::SharedRefBase::make<Dvr>(in_type, in_bufferSize, in_cb,
                                                         this->ref<Demux>());
            if (!mDvrPlayback->createDvrMQ()) {
                ALOGE("[Demux] cannot create dvr message queue");
                mDvrPlayback = nullptr;
                *_aidl_return = mDvrPlayback;
                return ::ndk::ScopedAStatus::fromServiceSpecificError(
                        static_cast<int32_t>(Result::UNKNOWN_ERROR));
            }

            for (it = mPlaybackFilterIds.begin(); it != mPlaybackFilterIds.end(); it++) {
                if (!mDvrPlayback->addPlaybackFilter(*it, mFilters[*it])) {
                    ALOGE("[Demux] Can't get filter info for DVR playback");
                    mDvrPlayback = nullptr;
                    *_aidl_return = mDvrPlayback;
                    return ::ndk::ScopedAStatus::fromServiceSpecificError(
                            static_cast<int32_t>(Result::UNKNOWN_ERROR));
                }
            }

            ALOGI("Playback normal case");

            *_aidl_return = mDvrPlayback;
            return ::ndk::ScopedAStatus::ok();
        case DvrType::RECORD:
            mDvrRecord = ndk::SharedRefBase::make<Dvr>(in_type, in_bufferSize, in_cb,
                                                       this->ref<Demux>());
            if (!mDvrRecord->createDvrMQ()) {
                mDvrRecord = nullptr;
                *_aidl_return = mDvrRecord;
                return ::ndk::ScopedAStatus::fromServiceSpecificError(
                        static_cast<int32_t>(Result::UNKNOWN_ERROR));
            }

            *_aidl_return = mDvrRecord;
            return ::ndk::ScopedAStatus::ok();
        default:
            *_aidl_return = nullptr;
            return ::ndk::ScopedAStatus::fromServiceSpecificError(
                    static_cast<int32_t>(Result::INVALID_ARGUMENT));
    }
}

void Demux::readIptvThreadLoop(dtv_plugin* interface, dtv_streamer* streamer, void* buf,
                               size_t buf_size, int timeout_ms, int buffer_timeout) {
    Timer *timer, *fullBufferTimer;
    while (mDemuxIptvReadThreadRunning) {
        if (mIsIptvDvrFMQFull && fullBufferTimer->get_elapsed_time_ms() > buffer_timeout) {
            ALOGE("DVR FMQ has not been flushed within timeout of %d ms", buffer_timeout);
            delete fullBufferTimer;
            break;
        }
        timer = new Timer();
        ssize_t bytes_read = interface->read_stream(streamer, buf, buf_size, timeout_ms);
        if (bytes_read == 0) {
            double elapsed_time = timer->get_elapsed_time_ms();
            if (elapsed_time > timeout_ms) {
                ALOGE("[Demux] timeout reached - elapsed_time: %f, timeout: %d", elapsed_time,
                      timeout_ms);
            }
            ALOGE("[Demux] Cannot read data from the socket");
            delete timer;
            break;
        }

        delete timer;
        ALOGI("Number of bytes read: %zd", bytes_read);
        int result = mDvrPlayback->writePlaybackFMQ(buf, bytes_read);

        switch (result) {
            case DVR_WRITE_FAILURE_REASON_FMQ_FULL:
                if (!mIsIptvDvrFMQFull) {
                    mIsIptvDvrFMQFull = true;
                    fullBufferTimer = new Timer();
                }
                ALOGI("Waiting for client to flush DVR FMQ.");
                break;
            case DVR_WRITE_FAILURE_REASON_UNKNOWN:
                ALOGE("Failed to write data into DVR FMQ for unknown reason");
                break;
            case DVR_WRITE_SUCCESS:
                ALOGI("Wrote %zd bytes to DVR FMQ", bytes_read);
                break;
            default:
                ALOGI("Invalid DVR Status");
        }
    }
    mDemuxIptvReadThreadRunning = false;
}

::ndk::ScopedAStatus Demux::setFrontendDataSource(int32_t in_frontendId) {
    ALOGV("%s", __FUNCTION__);

@@ -52,7 +168,6 @@ Demux::~Demux() {
        return ::ndk::ScopedAStatus::fromServiceSpecificError(
                static_cast<int32_t>(Result::NOT_INITIALIZED));
    }

    mFrontend = mTuner->getFrontendById(in_frontendId);
    if (mFrontend == nullptr) {
        return ::ndk::ScopedAStatus::fromServiceSpecificError(
@@ -61,6 +176,58 @@ Demux::~Demux() {

    mTuner->setFrontendAsDemuxSource(in_frontendId, mDemuxId);

    // if mFrontend is an IPTV frontend, create streamer to read TS data from socket
    if (mFrontend->getFrontendType() == FrontendType::IPTV) {
        // create a DVR instance on the demux
        shared_ptr<IDvr> iptvDvr;

        std::shared_ptr<IDvrCallback> dvrPlaybackCallback =
                ::ndk::SharedRefBase::make<DvrPlaybackCallback>();

        ::ndk::ScopedAStatus status =
                openDvr(DvrType::PLAYBACK, IPTV_BUFFER_SIZE, dvrPlaybackCallback, &iptvDvr);
        if (status.isOk()) {
            ALOGI("DVR instance created");
        }

        // get plugin interface from frontend
        dtv_plugin* interface = mFrontend->getIptvPluginInterface();
        if (interface == nullptr) {
            ALOGE("[Demux] getIptvPluginInterface(): plugin interface is null");
            return ::ndk::ScopedAStatus::fromServiceSpecificError(
                    static_cast<int32_t>(Result::INVALID_STATE));
        }
        ALOGI("[Demux] getIptvPluginInterface(): plugin interface is not null");

        // get streamer object from Frontend instance
        dtv_streamer* streamer = mFrontend->getIptvPluginStreamer();
        if (streamer == nullptr) {
            ALOGE("[Demux] getIptvPluginStreamer(): streamer is null");
            return ::ndk::ScopedAStatus::fromServiceSpecificError(
                    static_cast<int32_t>(Result::INVALID_STATE));
        }
        ALOGI("[Demux] getIptvPluginStreamer(): streamer is not null");

        // get transport description from frontend
        string transport_desc = mFrontend->getIptvTransportDescription();
        ALOGI("[Demux] getIptvTransportDescription(): transport_desc: %s", transport_desc.c_str());

        // call read_stream on the socket to populate the buffer with TS data
        // while thread is alive, keep reading data
        int timeout_ms = 20;
        int buffer_timeout = 10000;  // 10s
        void* buf = malloc(sizeof(char) * IPTV_BUFFER_SIZE);
        if (buf == nullptr) ALOGI("malloc buf failed");
        ALOGI("[   INFO   ] Allocated buffer of size %d", IPTV_BUFFER_SIZE);
        ALOGI("Getting FMQ from DVR instance to write socket data");
        mDemuxIptvReadThreadRunning = true;
        mDemuxIptvReadThread = std::thread(&Demux::readIptvThreadLoop, this, interface, streamer,
                                           buf, IPTV_BUFFER_SIZE, timeout_ms, buffer_timeout);
        if (mDemuxIptvReadThread.joinable()) {
            mDemuxIptvReadThread.join();
        }
        free(buf);
    }
    return ::ndk::ScopedAStatus::ok();
}

@@ -193,61 +360,6 @@ Demux::~Demux() {
    return ::ndk::ScopedAStatus::ok();
}

::ndk::ScopedAStatus Demux::openDvr(DvrType in_type, int32_t in_bufferSize,
                                    const std::shared_ptr<IDvrCallback>& in_cb,
                                    std::shared_ptr<IDvr>* _aidl_return) {
    ALOGV("%s", __FUNCTION__);

    if (in_cb == nullptr) {
        ALOGW("[Demux] DVR callback can't be null");
        *_aidl_return = nullptr;
        return ::ndk::ScopedAStatus::fromServiceSpecificError(
                static_cast<int32_t>(Result::INVALID_ARGUMENT));
    }

    set<int64_t>::iterator it;
    switch (in_type) {
        case DvrType::PLAYBACK:
            mDvrPlayback = ndk::SharedRefBase::make<Dvr>(in_type, in_bufferSize, in_cb,
                                                         this->ref<Demux>());
            if (!mDvrPlayback->createDvrMQ()) {
                mDvrPlayback = nullptr;
                *_aidl_return = mDvrPlayback;
                return ::ndk::ScopedAStatus::fromServiceSpecificError(
                        static_cast<int32_t>(Result::UNKNOWN_ERROR));
            }

            for (it = mPlaybackFilterIds.begin(); it != mPlaybackFilterIds.end(); it++) {
                if (!mDvrPlayback->addPlaybackFilter(*it, mFilters[*it])) {
                    ALOGE("[Demux] Can't get filter info for DVR playback");
                    mDvrPlayback = nullptr;
                    *_aidl_return = mDvrPlayback;
                    return ::ndk::ScopedAStatus::fromServiceSpecificError(
                            static_cast<int32_t>(Result::UNKNOWN_ERROR));
                }
            }

            *_aidl_return = mDvrPlayback;
            return ::ndk::ScopedAStatus::ok();
        case DvrType::RECORD:
            mDvrRecord = ndk::SharedRefBase::make<Dvr>(in_type, in_bufferSize, in_cb,
                                                       this->ref<Demux>());
            if (!mDvrRecord->createDvrMQ()) {
                mDvrRecord = nullptr;
                *_aidl_return = mDvrRecord;
                return ::ndk::ScopedAStatus::fromServiceSpecificError(
                        static_cast<int32_t>(Result::UNKNOWN_ERROR));
            }

            *_aidl_return = mDvrRecord;
            return ::ndk::ScopedAStatus::ok();
        default:
            *_aidl_return = nullptr;
            return ::ndk::ScopedAStatus::fromServiceSpecificError(
                    static_cast<int32_t>(Result::INVALID_ARGUMENT));
    }
}

::ndk::ScopedAStatus Demux::connectCiCam(int32_t in_ciCamId) {
    ALOGV("%s", __FUNCTION__);

+25 −0
Original line number Diff line number Diff line
@@ -17,6 +17,7 @@
#pragma once

#include <aidl/android/hardware/tv/tuner/BnDemux.h>
#include <aidl/android/hardware/tv/tuner/BnDvrCallback.h>

#include <fmq/AidlMessageQueue.h>
#include <math.h>
@@ -28,7 +29,9 @@
#include "Filter.h"
#include "Frontend.h"
#include "TimeFilter.h"
#include "Timer.h"
#include "Tuner.h"
#include "dtv_plugin.h"

using namespace std;

@@ -44,6 +47,8 @@ using ::android::AidlMessageQueue;
using ::android::hardware::EventFlag;

using FilterMQ = AidlMessageQueue<int8_t, SynchronizedReadWrite>;
using AidlMQ = AidlMessageQueue<int8_t, SynchronizedReadWrite>;
using AidlMQDesc = MQDescriptor<int8_t, SynchronizedReadWrite>;

class Dvr;
class Filter;
@@ -51,6 +56,19 @@ class Frontend;
class TimeFilter;
class Tuner;

class DvrPlaybackCallback : public BnDvrCallback {
  public:
    virtual ::ndk::ScopedAStatus onPlaybackStatus(PlaybackStatus status) override {
        ALOGD("demux.h: playback status %d", status);
        return ndk::ScopedAStatus::ok();
    }

    virtual ::ndk::ScopedAStatus onRecordStatus(RecordStatus status) override {
        ALOGD("Record Status %hhd", status);
        return ndk::ScopedAStatus::ok();
    }
};

class Demux : public BnDemux {
  public:
    Demux(int32_t demuxId, uint32_t filterTypes);
@@ -85,6 +103,8 @@ class Demux : public BnDemux {
    void setIsRecording(bool isRecording);
    bool isRecording();
    void startFrontendInputLoop();
    void readIptvThreadLoop(dtv_plugin* interface, dtv_streamer* streamer, void* buf, size_t size,
                            int timeout_ms, int buffer_timeout);

    /**
     * A dispatcher to read and dispatch input data to all the started filters.
@@ -167,11 +187,16 @@ class Demux : public BnDemux {

    // Thread handlers
    std::thread mFrontendInputThread;
    std::thread mDemuxIptvReadThread;

    // track whether the DVR FMQ for IPTV Playback is full
    bool mIsIptvDvrFMQFull = false;

    /**
     * If a specific filter's writing loop is still running
     */
    std::atomic<bool> mFrontendInputThreadRunning;
    std::atomic<bool> mDemuxIptvReadThreadRunning;
    std::atomic<bool> mKeepFetchingDataFromFrontend;

    /**
+32 −0
Original line number Diff line number Diff line
@@ -236,6 +236,20 @@ void Dvr::playbackThreadLoop() {
    ALOGD("[Dvr] playback thread ended.");
}

void Dvr::maySendIptvPlaybackStatusCallback() {
    lock_guard<mutex> lock(mPlaybackStatusLock);
    int availableToRead = mDvrMQ->availableToRead();
    int availableToWrite = mDvrMQ->availableToWrite();

    PlaybackStatus newStatus = checkPlaybackStatusChange(availableToWrite, availableToRead,
                                                         IPTV_PLAYBACK_STATUS_THRESHOLD_HIGH,
                                                         IPTV_PLAYBACK_STATUS_THRESHOLD_LOW);
    if (mPlaybackStatus != newStatus) {
        mCallback->onPlaybackStatus(newStatus);
        mPlaybackStatus = newStatus;
    }
}

void Dvr::maySendPlaybackStatusCallback() {
    lock_guard<mutex> lock(mPlaybackStatusLock);
    int availableToRead = mDvrMQ->availableToRead();
@@ -443,6 +457,24 @@ bool Dvr::startFilterDispatcher(bool isVirtualFrontend, bool isRecording) {
    return true;
}

int Dvr::writePlaybackFMQ(void* buf, size_t size) {
    lock_guard<mutex> lock(mWriteLock);
    ALOGI("Playback status: %d", mPlaybackStatus);
    if (mPlaybackStatus == PlaybackStatus::SPACE_FULL) {
        ALOGW("[Dvr] stops writing and wait for the client side flushing.");
        return DVR_WRITE_FAILURE_REASON_FMQ_FULL;
    }
    ALOGI("availableToWrite before: %zu", mDvrMQ->availableToWrite());
    if (mDvrMQ->write((int8_t*)buf, size)) {
        mDvrEventFlag->wake(static_cast<uint32_t>(DemuxQueueNotifyBits::DATA_READY));
        ALOGI("availableToWrite: %zu", mDvrMQ->availableToWrite());
        maySendIptvPlaybackStatusCallback();
        return DVR_WRITE_SUCCESS;
    }
    maySendIptvPlaybackStatusCallback();
    return DVR_WRITE_FAILURE_REASON_UNKNOWN;
}

bool Dvr::writeRecordFMQ(const vector<int8_t>& data) {
    lock_guard<mutex> lock(mWriteLock);
    if (mRecordStatus == RecordStatus::OVERFLOW) {
+15 −0
Original line number Diff line number Diff line
@@ -43,6 +43,19 @@ using ::android::hardware::EventFlag;

using DvrMQ = AidlMessageQueue<int8_t, SynchronizedReadWrite>;

const int DVR_WRITE_SUCCESS = 0;
const int DVR_WRITE_FAILURE_REASON_FMQ_FULL = 1;
const int DVR_WRITE_FAILURE_REASON_UNKNOWN = 2;

const int TS_SIZE = 188;
const int IPTV_BUFFER_SIZE = TS_SIZE * 7 * 8;  // defined in service_streamer_udp in cbs v3 project

// Thresholds are defined to indicate how full the buffers are.
const double HIGH_THRESHOLD_PERCENT = 0.90;
const double LOW_THRESHOLD_PERCENT = 0.15;
const int IPTV_PLAYBACK_STATUS_THRESHOLD_HIGH = IPTV_BUFFER_SIZE * HIGH_THRESHOLD_PERCENT;
const int IPTV_PLAYBACK_STATUS_THRESHOLD_LOW = IPTV_BUFFER_SIZE * LOW_THRESHOLD_PERCENT;

struct MediaEsMetaData {
    bool isAudio;
    int startIndex;
@@ -80,6 +93,7 @@ class Dvr : public BnDvr {
     * Return false is any of the above processes fails.
     */
    bool createDvrMQ();
    int writePlaybackFMQ(void* buf, size_t size);
    bool writeRecordFMQ(const std::vector<int8_t>& data);
    bool addPlaybackFilter(int64_t filterId, std::shared_ptr<IFilter> filter);
    bool removePlaybackFilter(int64_t filterId);
@@ -102,6 +116,7 @@ class Dvr : public BnDvr {
    bool readDataFromMQ();
    void getMetaDataValue(int& index, int8_t* dataOutputBuffer, int& value);
    void maySendPlaybackStatusCallback();
    void maySendIptvPlaybackStatusCallback();
    void maySendRecordStatusCallback();
    PlaybackStatus checkPlaybackStatusChange(uint32_t availableToWrite, uint32_t availableToRead,
                                             int64_t highThreshold, int64_t lowThreshold);
Loading