Donate to e Foundation | Murena handsets with /e/OS | Own a part of Murena! Learn more

Commit 25a0f6fe authored by sadiqsada's avatar sadiqsada
Browse files

Refactor plugin interface, streamer creation

Bug: 288170590
Test: atest VtsHalTvTunerTargetTest
Change-Id: Ib141bbb05df8ce17eb5fdb1fed017110fd46a510
parent 7186eb99
Loading
Loading
Loading
Loading
+32 −25
Original line number Diff line number Diff line
@@ -123,26 +123,28 @@ void Demux::setIptvThreadRunning(bool isIptvThreadRunning) {
    mIsIptvThreadRunningCv.notify_all();
}

void Demux::readIptvThreadLoop(dtv_plugin* interface, dtv_streamer* streamer, size_t buf_size,
                               int timeout_ms, int buffer_timeout) {
void Demux::frontendIptvInputThreadLoop(dtv_plugin* interface, dtv_streamer* streamer) {
    Timer *timer, *fullBufferTimer;
    while (true) {
        std::unique_lock<std::mutex> lock(mIsIptvThreadRunningMutex);
        mIsIptvThreadRunningCv.wait(lock, [this] { return mIsIptvReadThreadRunning; });
        if (mIsIptvDvrFMQFull && fullBufferTimer->get_elapsed_time_ms() > buffer_timeout) {
            ALOGE("DVR FMQ has not been flushed within timeout of %d ms", buffer_timeout);
        if (mIsIptvDvrFMQFull &&
            fullBufferTimer->get_elapsed_time_ms() > IPTV_PLAYBACK_BUFFER_TIMEOUT) {
            ALOGE("DVR FMQ has not been flushed within timeout of %d ms",
                  IPTV_PLAYBACK_BUFFER_TIMEOUT);
            delete fullBufferTimer;
            break;
        }
        timer = new Timer();
        void* buf = malloc(sizeof(char) * IPTV_BUFFER_SIZE);
        if (buf == nullptr) ALOGI("Buffer allocation failed");
        ssize_t bytes_read = interface->read_stream(streamer, buf, buf_size, timeout_ms);
        ssize_t bytes_read =
                interface->read_stream(streamer, buf, IPTV_BUFFER_SIZE, IPTV_PLAYBACK_TIMEOUT);
        if (bytes_read == 0) {
            double elapsed_time = timer->get_elapsed_time_ms();
            if (elapsed_time > timeout_ms) {
            if (elapsed_time > IPTV_PLAYBACK_TIMEOUT) {
                ALOGE("[Demux] timeout reached - elapsed_time: %f, timeout: %d", elapsed_time,
                      timeout_ms);
                      IPTV_PLAYBACK_TIMEOUT);
            }
            ALOGE("[Demux] Cannot read data from the socket");
            delete timer;
@@ -206,32 +208,37 @@ void Demux::readIptvThreadLoop(dtv_plugin* interface, dtv_streamer* streamer, si

        // get plugin interface from frontend
        dtv_plugin* interface = mFrontend->getIptvPluginInterface();
        // if plugin interface is not on frontend, create a new plugin interface
        if (interface == nullptr) {
            ALOGE("[Demux] getIptvPluginInterface(): plugin interface is null");
            interface = mFrontend->createIptvPluginInterface();
            if (interface == nullptr) {
                ALOGE("[   INFO   ] Failed to load plugin.");
                return ::ndk::ScopedAStatus::fromServiceSpecificError(
                        static_cast<int32_t>(Result::INVALID_STATE));
            }
        ALOGI("[Demux] getIptvPluginInterface(): plugin interface is not null");
        }

        // get transport description from frontend
        string transport_desc = mFrontend->getIptvTransportDescription();
        if (transport_desc.empty()) {
            string content_url = "rtp://127.0.0.1:12345";
            transport_desc = "{ \"uri\": \"" + content_url + "\"}";
        }
        ALOGI("[Demux] transport_desc: %s", transport_desc.c_str());

        // get streamer object from Frontend instance
        dtv_streamer* streamer = mFrontend->getIptvPluginStreamer();
        if (streamer == nullptr) {
            ALOGE("[Demux] getIptvPluginStreamer(): streamer is null");
            streamer = mFrontend->createIptvPluginStreamer(interface, transport_desc.c_str());
            if (streamer == nullptr) {
                ALOGE("[   INFO   ] Failed to open stream");
                return ::ndk::ScopedAStatus::fromServiceSpecificError(
                        static_cast<int32_t>(Result::INVALID_STATE));
            }
        ALOGI("[Demux] getIptvPluginStreamer(): streamer is not null");
        }

        // get transport description from frontend
        string transport_desc = mFrontend->getIptvTransportDescription();
        ALOGI("[Demux] getIptvTransportDescription(): transport_desc: %s", transport_desc.c_str());

        // call read_stream on the socket to populate the buffer with TS data
        // while thread is alive, keep reading data
        int timeout_ms = 20;
        int buffer_timeout = 10000;  // 10s
        mDemuxIptvReadThread = std::thread(&Demux::readIptvThreadLoop, this, interface, streamer,
                                           IPTV_BUFFER_SIZE, timeout_ms, buffer_timeout);
        mDemuxIptvReadThread =
                std::thread(&Demux::frontendIptvInputThreadLoop, this, interface, streamer);
    }
    return ::ndk::ScopedAStatus::ok();
}
+4 −2
Original line number Diff line number Diff line
@@ -56,6 +56,9 @@ class Frontend;
class TimeFilter;
class Tuner;

const int IPTV_PLAYBACK_TIMEOUT = 20;            // ms
const int IPTV_PLAYBACK_BUFFER_TIMEOUT = 20000;  // ms

class DvrPlaybackCallback : public BnDvrCallback {
  public:
    virtual ::ndk::ScopedAStatus onPlaybackStatus(PlaybackStatus status) override {
@@ -103,8 +106,7 @@ class Demux : public BnDemux {
    void setIsRecording(bool isRecording);
    bool isRecording();
    void startFrontendInputLoop();
    void readIptvThreadLoop(dtv_plugin* interface, dtv_streamer* streamer, size_t size,
                            int timeout_ms, int buffer_timeout);
    void frontendIptvInputThreadLoop(dtv_plugin* interface, dtv_streamer* streamer);

    /**
     * A dispatcher to read and dispatch input data to all the started filters.
+43 −34
Original line number Diff line number Diff line
@@ -215,8 +215,31 @@ Frontend::~Frontend() {
    return ::ndk::ScopedAStatus::ok();
}

void Frontend::readTuneByte(dtv_streamer* streamer, void* buf, size_t buf_size, int timeout_ms) {
    ssize_t bytes_read = mIptvPluginInterface->read_stream(streamer, buf, buf_size, timeout_ms);
dtv_plugin* Frontend::createIptvPluginInterface() {
    const char* path = "/vendor/lib/iptv_udp_plugin.so";
    DtvPlugin* plugin = new DtvPlugin(path);
    bool plugin_loaded = plugin->load();
    if (!plugin_loaded) {
        ALOGE("Failed to load plugin");
        return nullptr;
    }
    return plugin->interface();
}

dtv_streamer* Frontend::createIptvPluginStreamer(dtv_plugin* interface,
                                                 const char* transport_desc) {
    dtv_streamer* streamer = interface->create_streamer();
    int open_fd = interface->open_stream(streamer, transport_desc);
    if (open_fd < 0) {
        return nullptr;
    }
    ALOGI("[   INFO   ] open_stream successful, open_fd=%d", open_fd);
    return streamer;
}

void Frontend::readTuneByte(void* buf) {
    ssize_t bytes_read = mIptvPluginInterface->read_stream(mIptvPluginStreamer, buf,
                                                           TUNE_BUFFER_SIZE, TUNE_BUFFER_TIMEOUT);
    if (bytes_read <= 0) {
        ALOGI("[   ERROR   ] Tune byte couldn't be read.");
        return;
@@ -242,53 +265,39 @@ void Frontend::readTuneByte(dtv_streamer* streamer, void* buf, size_t buf_size,
        ALOGI("[   INFO   ] Frontend type is set to IPTV, tag = %d id=%d", in_settings.getTag(),
              mId);

        // load udp plugin for reading TS data
        const char* path = "/vendor/lib/iptv_udp_plugin.so";
        DtvPlugin* plugin = new DtvPlugin(path);
        if (!plugin) {
            ALOGE("Failed to create DtvPlugin, plugin_path is invalid");
            return ::ndk::ScopedAStatus::fromServiceSpecificError(
                    static_cast<int32_t>(Result::INVALID_ARGUMENT));
        }
        bool plugin_loaded = plugin->load();
        if (!plugin_loaded) {
            ALOGE("Failed to load plugin");
        mIptvPluginInterface = createIptvPluginInterface();
        if (mIptvPluginInterface == nullptr) {
            ALOGE("[   INFO   ] Failed to load plugin.");
            return ::ndk::ScopedAStatus::fromServiceSpecificError(
                    static_cast<int32_t>(Result::INVALID_ARGUMENT));
        }
        mIptvPluginInterface = plugin->interface();

        // validate content_url format
        std::string content_url = in_settings.get<FrontendSettings::Tag::iptv>()->contentUrl;
        std::string transport_desc = "{ \"uri\": \"" + content_url + "\"}";
        ALOGI("[   INFO   ] transport_desc: %s", transport_desc.c_str());
        bool is_transport_desc_valid = plugin->validate(transport_desc.c_str());
        mIptvTransportDescription = "{ \"uri\": \"" + content_url + "\"}";
        ALOGI("[   INFO   ] transport_desc: %s", mIptvTransportDescription.c_str());
        bool is_transport_desc_valid =
                mIptvPluginInterface->validate(mIptvTransportDescription.c_str());
        if (!is_transport_desc_valid) {  // not of format protocol://ip:port
            ALOGE("[   INFO   ] transport_desc is not valid");
            return ::ndk::ScopedAStatus::fromServiceSpecificError(
                    static_cast<int32_t>(Result::INVALID_ARGUMENT));
        }
        mIptvTransportDescription = transport_desc;

        // create a streamer and open it for reading data
        dtv_streamer* streamer = mIptvPluginInterface->create_streamer();
        mIptvPluginStreamer = streamer;
        int open_fd = mIptvPluginInterface->open_stream(streamer, transport_desc.c_str());
        if (open_fd < 0) {
            ALOGE("[   INFO   ] could not open stream");
        mIptvPluginStreamer =
                createIptvPluginStreamer(mIptvPluginInterface, mIptvTransportDescription.c_str());

        void* buf = malloc(sizeof(char) * TUNE_BUFFER_SIZE);
        if (buf == nullptr) {
            ALOGE("Failed to allocate 1 byte buffer for tuning.");
            return ::ndk::ScopedAStatus::fromServiceSpecificError(
                    static_cast<int32_t>(Result::INVALID_ARGUMENT));
                    static_cast<int32_t>(Result::INVALID_STATE));
        }
        mIptvFrontendTuneThread = std::thread(&Frontend::readTuneByte, this, buf);
        if (mIptvFrontendTuneThread.joinable()) {
            mIptvFrontendTuneThread.join();
        }
        ALOGI("[   INFO   ] open_stream successful, open_fd=%d", open_fd);

        size_t buf_size = 1;
        int timeout_ms = 2000;
        void* buf = malloc(sizeof(char) * buf_size);
        if (buf == nullptr) ALOGI("malloc buf failed [TUNE]");
        ALOGI("[   INFO   ] [Tune] Allocated buffer of size %zu", buf_size);
        mIptvFrontendTuneThread =
                std::thread(&Frontend::readTuneByte, this, streamer, buf, buf_size, timeout_ms);
        if (mIptvFrontendTuneThread.joinable()) mIptvFrontendTuneThread.join();
        free(buf);
    }

+6 −1
Original line number Diff line number Diff line
@@ -33,6 +33,9 @@ namespace tuner {

class Tuner;

const int TUNE_BUFFER_SIZE = 1;        // byte
const int TUNE_BUFFER_TIMEOUT = 2000;  // ms

class Frontend : public BnFrontend {
  public:
    Frontend(FrontendType type, int32_t id);
@@ -64,7 +67,9 @@ class Frontend : public BnFrontend {
    dtv_plugin* getIptvPluginInterface();
    string getIptvTransportDescription();
    dtv_streamer* getIptvPluginStreamer();
    void readTuneByte(dtv_streamer* streamer, void* buf, size_t size, int timeout_ms);
    void readTuneByte(void* buf);
    dtv_streamer* createIptvPluginStreamer(dtv_plugin* interface, const char* transport_desc);
    dtv_plugin* createIptvPluginInterface();
    bool isLocked();
    void getFrontendInfo(FrontendInfo* _aidl_return);
    void setTunerService(std::shared_ptr<Tuner> tuner);