Donate to e Foundation | Murena handsets with /e/OS | Own a part of Murena! Learn more

Commit ee6e1f9c authored by Sadiq Sada's avatar Sadiq Sada Committed by Android (Google) Code Review
Browse files

Merge changes from topic "iptv_vts_fix" into main

* changes:
  Reenable VTS tests for IPTV
  Terminate IPTV read thread on demux close
  Fix tune byte alignment
  mFilterCount cannot be negative
  Refactor plugin interface, streamer creation
parents 9db9570e db9c51aa
Loading
Loading
Loading
Loading
+67 −33
Original line number Original line Diff line number Diff line
@@ -53,9 +53,6 @@ void Demux::setTunerService(std::shared_ptr<Tuner> tuner) {


Demux::~Demux() {
Demux::~Demux() {
    ALOGV("%s", __FUNCTION__);
    ALOGV("%s", __FUNCTION__);
    if (mDemuxIptvReadThread.joinable()) {
        mDemuxIptvReadThread.join();
    }
    close();
    close();
}
}


@@ -123,26 +120,43 @@ void Demux::setIptvThreadRunning(bool isIptvThreadRunning) {
    mIsIptvThreadRunningCv.notify_all();
    mIsIptvThreadRunningCv.notify_all();
}
}


void Demux::readIptvThreadLoop(dtv_plugin* interface, dtv_streamer* streamer, size_t buf_size,
void Demux::frontendIptvInputThreadLoop(dtv_plugin* interface, dtv_streamer* streamer, void* buf) {
                               int timeout_ms, int buffer_timeout) {
    Timer *timer, *fullBufferTimer;
    Timer *timer, *fullBufferTimer;
    bool isTuneBytePushedToDvr = false;
    while (true) {
    while (true) {
        std::unique_lock<std::mutex> lock(mIsIptvThreadRunningMutex);
        std::unique_lock<std::mutex> lock(mIsIptvThreadRunningMutex);
        mIsIptvThreadRunningCv.wait(lock, [this] { return mIsIptvReadThreadRunning; });
        mIsIptvThreadRunningCv.wait(
        if (mIsIptvDvrFMQFull && fullBufferTimer->get_elapsed_time_ms() > buffer_timeout) {
                lock, [this] { return mIsIptvReadThreadRunning || mIsIptvReadThreadTerminated; });
            ALOGE("DVR FMQ has not been flushed within timeout of %d ms", buffer_timeout);
        if (mIsIptvReadThreadTerminated) {
            ALOGI("[Demux] IPTV reading thread for playback terminated");
            break;
        }
        if (mIsIptvDvrFMQFull &&
            fullBufferTimer->get_elapsed_time_ms() > IPTV_PLAYBACK_BUFFER_TIMEOUT) {
            ALOGE("DVR FMQ has not been flushed within timeout of %d ms",
                  IPTV_PLAYBACK_BUFFER_TIMEOUT);
            delete fullBufferTimer;
            delete fullBufferTimer;
            break;
            break;
        }
        }
        timer = new Timer();
        timer = new Timer();
        void* buf = malloc(sizeof(char) * IPTV_BUFFER_SIZE);
        ssize_t bytes_read;
        if (buf == nullptr) ALOGI("Buffer allocation failed");
        void* tuneByteBuffer = mFrontend->getTuneByteBuffer();
        ssize_t bytes_read = interface->read_stream(streamer, buf, buf_size, timeout_ms);
        if (!isTuneBytePushedToDvr && tuneByteBuffer != nullptr) {
        if (bytes_read == 0) {
            memcpy(buf, tuneByteBuffer, 1);
            char* offsetBuf = (char*)buf + 1;
            bytes_read = interface->read_stream(streamer, (void*)offsetBuf, IPTV_BUFFER_SIZE - 1,
                                                IPTV_PLAYBACK_TIMEOUT);
            isTuneBytePushedToDvr = true;
        } else {
            bytes_read =
                    interface->read_stream(streamer, buf, IPTV_BUFFER_SIZE, IPTV_PLAYBACK_TIMEOUT);
        }

        if (bytes_read <= 0) {
            double elapsed_time = timer->get_elapsed_time_ms();
            double elapsed_time = timer->get_elapsed_time_ms();
            if (elapsed_time > timeout_ms) {
            if (elapsed_time > IPTV_PLAYBACK_TIMEOUT) {
                ALOGE("[Demux] timeout reached - elapsed_time: %f, timeout: %d", elapsed_time,
                ALOGE("[Demux] timeout reached - elapsed_time: %f, timeout: %d", elapsed_time,
                      timeout_ms);
                      IPTV_PLAYBACK_TIMEOUT);
            }
            }
            ALOGE("[Demux] Cannot read data from the socket");
            ALOGE("[Demux] Cannot read data from the socket");
            delete timer;
            delete timer;
@@ -170,8 +184,6 @@ void Demux::readIptvThreadLoop(dtv_plugin* interface, dtv_streamer* streamer, si
            default:
            default:
                ALOGI("Invalid DVR Status");
                ALOGI("Invalid DVR Status");
        }
        }

        free(buf);
    }
    }
}
}


@@ -206,32 +218,44 @@ void Demux::readIptvThreadLoop(dtv_plugin* interface, dtv_streamer* streamer, si


        // get plugin interface from frontend
        // get plugin interface from frontend
        dtv_plugin* interface = mFrontend->getIptvPluginInterface();
        dtv_plugin* interface = mFrontend->getIptvPluginInterface();
        // if plugin interface is not on frontend, create a new plugin interface
        if (interface == nullptr) {
            interface = mFrontend->createIptvPluginInterface();
            if (interface == nullptr) {
            if (interface == nullptr) {
            ALOGE("[Demux] getIptvPluginInterface(): plugin interface is null");
                ALOGE("[   INFO   ] Failed to load plugin.");
                return ::ndk::ScopedAStatus::fromServiceSpecificError(
                return ::ndk::ScopedAStatus::fromServiceSpecificError(
                        static_cast<int32_t>(Result::INVALID_STATE));
                        static_cast<int32_t>(Result::INVALID_STATE));
            }
            }
        ALOGI("[Demux] getIptvPluginInterface(): plugin interface is not null");
        }

        // get transport description from frontend
        string transport_desc = mFrontend->getIptvTransportDescription();
        if (transport_desc.empty()) {
            string content_url = "rtp://127.0.0.1:12345";
            transport_desc = "{ \"uri\": \"" + content_url + "\"}";
        }
        ALOGI("[Demux] transport_desc: %s", transport_desc.c_str());


        // get streamer object from Frontend instance
        // get streamer object from Frontend instance
        dtv_streamer* streamer = mFrontend->getIptvPluginStreamer();
        dtv_streamer* streamer = mFrontend->getIptvPluginStreamer();
        if (streamer == nullptr) {
        if (streamer == nullptr) {
            ALOGE("[Demux] getIptvPluginStreamer(): streamer is null");
            streamer = mFrontend->createIptvPluginStreamer(interface, transport_desc.c_str());
            if (streamer == nullptr) {
                ALOGE("[   INFO   ] Failed to open stream");
                return ::ndk::ScopedAStatus::fromServiceSpecificError(
                return ::ndk::ScopedAStatus::fromServiceSpecificError(
                        static_cast<int32_t>(Result::INVALID_STATE));
                        static_cast<int32_t>(Result::INVALID_STATE));
            }
            }
        ALOGI("[Demux] getIptvPluginStreamer(): streamer is not null");
        }

        stopIptvFrontendInput();
        // get transport description from frontend
        mIsIptvReadThreadTerminated = false;
        string transport_desc = mFrontend->getIptvTransportDescription();
        void* buf = malloc(sizeof(char) * IPTV_BUFFER_SIZE);
        ALOGI("[Demux] getIptvTransportDescription(): transport_desc: %s", transport_desc.c_str());
        if (buf == nullptr) {

            ALOGE("[Demux] Buffer allocation failed");
        // call read_stream on the socket to populate the buffer with TS data
            return ::ndk::ScopedAStatus::fromServiceSpecificError(
        // while thread is alive, keep reading data
                    static_cast<int32_t>(Result::INVALID_STATE));
        int timeout_ms = 20;
        }
        int buffer_timeout = 10000;  // 10s
        mDemuxIptvReadThread =
        mDemuxIptvReadThread = std::thread(&Demux::readIptvThreadLoop, this, interface, streamer,
                std::thread(&Demux::frontendIptvInputThreadLoop, this, interface, streamer, buf);
                                           IPTV_BUFFER_SIZE, timeout_ms, buffer_timeout);
    }
    }
    return ::ndk::ScopedAStatus::ok();
    return ::ndk::ScopedAStatus::ok();
}
}
@@ -348,6 +372,7 @@ void Demux::readIptvThreadLoop(dtv_plugin* interface, dtv_streamer* streamer, si
    ALOGV("%s", __FUNCTION__);
    ALOGV("%s", __FUNCTION__);


    stopFrontendInput();
    stopFrontendInput();
    stopIptvFrontendInput();


    set<int64_t>::iterator it;
    set<int64_t>::iterator it;
    for (it = mPlaybackFilterIds.begin(); it != mPlaybackFilterIds.end(); it++) {
    for (it = mPlaybackFilterIds.begin(); it != mPlaybackFilterIds.end(); it++) {
@@ -543,6 +568,15 @@ void Demux::stopFrontendInput() {
    }
    }
}
}


void Demux::stopIptvFrontendInput() {
    ALOGD("[Demux] stop iptv frontend on demux");
    if (mDemuxIptvReadThread.joinable()) {
        mIsIptvReadThreadTerminated = true;
        mIsIptvThreadRunningCv.notify_all();
        mDemuxIptvReadThread.join();
    }
}

void Demux::setIsRecording(bool isRecording) {
void Demux::setIsRecording(bool isRecording) {
    mIsRecording = isRecording;
    mIsRecording = isRecording;
}
}
+10 −3
Original line number Original line Diff line number Diff line
@@ -56,6 +56,9 @@ class Frontend;
class TimeFilter;
class TimeFilter;
class Tuner;
class Tuner;


const int IPTV_PLAYBACK_TIMEOUT = 20;            // ms
const int IPTV_PLAYBACK_BUFFER_TIMEOUT = 20000;  // ms

class DvrPlaybackCallback : public BnDvrCallback {
class DvrPlaybackCallback : public BnDvrCallback {
  public:
  public:
    virtual ::ndk::ScopedAStatus onPlaybackStatus(PlaybackStatus status) override {
    virtual ::ndk::ScopedAStatus onPlaybackStatus(PlaybackStatus status) override {
@@ -103,8 +106,7 @@ class Demux : public BnDemux {
    void setIsRecording(bool isRecording);
    void setIsRecording(bool isRecording);
    bool isRecording();
    bool isRecording();
    void startFrontendInputLoop();
    void startFrontendInputLoop();
    void readIptvThreadLoop(dtv_plugin* interface, dtv_streamer* streamer, size_t size,
    void frontendIptvInputThreadLoop(dtv_plugin* interface, dtv_streamer* streamer, void* buf);
                            int timeout_ms, int buffer_timeout);


    /**
    /**
     * A dispatcher to read and dispatch input data to all the started filters.
     * A dispatcher to read and dispatch input data to all the started filters.
@@ -128,6 +130,10 @@ class Demux : public BnDemux {
     * Setter for IPTV Reading thread
     * Setter for IPTV Reading thread
     */
     */
    void setIptvThreadRunning(bool isIptvThreadRunning);
    void setIptvThreadRunning(bool isIptvThreadRunning);
    /**
     * Stops IPTV playback reading thread.
     */
    void stopIptvFrontendInput();


  private:
  private:
    // Tuner service
    // Tuner service
@@ -206,7 +212,8 @@ class Demux : public BnDemux {
    /**
    /**
     * Controls IPTV reading thread status
     * Controls IPTV reading thread status
     */
     */
    bool mIsIptvReadThreadRunning;
    bool mIsIptvReadThreadRunning = false;
    std::atomic<bool> mIsIptvReadThreadTerminated = false;
    std::mutex mIsIptvThreadRunningMutex;
    std::mutex mIsIptvThreadRunningMutex;
    std::condition_variable mIsIptvThreadRunningCv;
    std::condition_variable mIsIptvThreadRunningCv;


+5 −3
Original line number Original line Diff line number Diff line
@@ -366,10 +366,12 @@ Filter::~Filter() {
::ndk::ScopedAStatus Filter::stop() {
::ndk::ScopedAStatus Filter::stop() {
    ALOGV("%s", __FUNCTION__);
    ALOGV("%s", __FUNCTION__);


    if (mFilterCount > 0) {
        mFilterCount -= 1;
        mFilterCount -= 1;
    if (mFilterCount == 0) {
        if (mFilterCount.load() == 0) {
            mDemux->setIptvThreadRunning(false);
            mDemux->setIptvThreadRunning(false);
        }
        }
    }


    mFilterThreadRunning = false;
    mFilterThreadRunning = false;
    if (mFilterThread.joinable()) {
    if (mFilterThread.joinable()) {
+48 −36
Original line number Original line Diff line number Diff line
@@ -188,6 +188,9 @@ Frontend::~Frontend() {
    mCallback = nullptr;
    mCallback = nullptr;
    mIsLocked = false;
    mIsLocked = false;
    mTuner = nullptr;
    mTuner = nullptr;
    if (mTuneByteBuffer != nullptr) {
        free(mTuneByteBuffer);
    }
}
}


::ndk::ScopedAStatus Frontend::close() {
::ndk::ScopedAStatus Frontend::close() {
@@ -215,19 +218,43 @@ Frontend::~Frontend() {
    return ::ndk::ScopedAStatus::ok();
    return ::ndk::ScopedAStatus::ok();
}
}


void Frontend::readTuneByte(dtv_streamer* streamer, void* buf, size_t buf_size, int timeout_ms) {
dtv_plugin* Frontend::createIptvPluginInterface() {
    ssize_t bytes_read = mIptvPluginInterface->read_stream(streamer, buf, buf_size, timeout_ms);
    const char* path = "/vendor/lib/iptv_udp_plugin.so";
    DtvPlugin* plugin = new DtvPlugin(path);
    bool plugin_loaded = plugin->load();
    if (!plugin_loaded) {
        ALOGE("Failed to load plugin");
        return nullptr;
    }
    return plugin->interface();
}

dtv_streamer* Frontend::createIptvPluginStreamer(dtv_plugin* interface,
                                                 const char* transport_desc) {
    dtv_streamer* streamer = interface->create_streamer();
    int open_fd = interface->open_stream(streamer, transport_desc);
    if (open_fd < 0) {
        return nullptr;
    }
    ALOGI("[   INFO   ] open_stream successful, open_fd=%d", open_fd);
    return streamer;
}

void Frontend::readTuneByte(void* buf) {
    ssize_t bytes_read = mIptvPluginInterface->read_stream(mIptvPluginStreamer, buf,
                                                           TUNE_BUFFER_SIZE, TUNE_BUFFER_TIMEOUT);
    if (bytes_read <= 0) {
    if (bytes_read <= 0) {
        ALOGI("[   ERROR   ] Tune byte couldn't be read.");
        ALOGI("[   ERROR   ] Tune byte couldn't be read.");
        return;
        return;
    }
    }
    mCallback->onEvent(FrontendEventType::LOCKED);
    mCallback->onEvent(FrontendEventType::LOCKED);
    mIsLocked = true;
    mIsLocked = true;
    mTuneByteBuffer = buf;
}
}


::ndk::ScopedAStatus Frontend::tune(const FrontendSettings& in_settings) {
::ndk::ScopedAStatus Frontend::tune(const FrontendSettings& in_settings) {
    if (mCallback == nullptr) {
    if (mCallback == nullptr) {
        ALOGW("[   WARN   ] Frontend callback is not set for tunin0g");
        ALOGW("[   WARN   ] Frontend callback is not set for tuning");
        return ::ndk::ScopedAStatus::fromServiceSpecificError(
        return ::ndk::ScopedAStatus::fromServiceSpecificError(
                static_cast<int32_t>(Result::INVALID_STATE));
                static_cast<int32_t>(Result::INVALID_STATE));
    }
    }
@@ -242,54 +269,39 @@ void Frontend::readTuneByte(dtv_streamer* streamer, void* buf, size_t buf_size,
        ALOGI("[   INFO   ] Frontend type is set to IPTV, tag = %d id=%d", in_settings.getTag(),
        ALOGI("[   INFO   ] Frontend type is set to IPTV, tag = %d id=%d", in_settings.getTag(),
              mId);
              mId);


        // load udp plugin for reading TS data
        mIptvPluginInterface = createIptvPluginInterface();
        const char* path = "/vendor/lib/iptv_udp_plugin.so";
        if (mIptvPluginInterface == nullptr) {
        DtvPlugin* plugin = new DtvPlugin(path);
            ALOGE("[   INFO   ] Failed to load plugin.");
        if (!plugin) {
            ALOGE("Failed to create DtvPlugin, plugin_path is invalid");
            return ::ndk::ScopedAStatus::fromServiceSpecificError(
                    static_cast<int32_t>(Result::INVALID_ARGUMENT));
        }
        bool plugin_loaded = plugin->load();
        if (!plugin_loaded) {
            ALOGE("Failed to load plugin");
            return ::ndk::ScopedAStatus::fromServiceSpecificError(
            return ::ndk::ScopedAStatus::fromServiceSpecificError(
                    static_cast<int32_t>(Result::INVALID_ARGUMENT));
                    static_cast<int32_t>(Result::INVALID_ARGUMENT));
        }
        }
        mIptvPluginInterface = plugin->interface();


        // validate content_url format
        // validate content_url format
        std::string content_url = in_settings.get<FrontendSettings::Tag::iptv>()->contentUrl;
        std::string content_url = in_settings.get<FrontendSettings::Tag::iptv>()->contentUrl;
        std::string transport_desc = "{ \"uri\": \"" + content_url + "\"}";
        mIptvTransportDescription = "{ \"uri\": \"" + content_url + "\"}";
        ALOGI("[   INFO   ] transport_desc: %s", transport_desc.c_str());
        ALOGI("[   INFO   ] transport_desc: %s", mIptvTransportDescription.c_str());
        bool is_transport_desc_valid = plugin->validate(transport_desc.c_str());
        bool is_transport_desc_valid =
                mIptvPluginInterface->validate(mIptvTransportDescription.c_str());
        if (!is_transport_desc_valid) {  // not of format protocol://ip:port
        if (!is_transport_desc_valid) {  // not of format protocol://ip:port
            ALOGE("[   INFO   ] transport_desc is not valid");
            ALOGE("[   INFO   ] transport_desc is not valid");
            return ::ndk::ScopedAStatus::fromServiceSpecificError(
            return ::ndk::ScopedAStatus::fromServiceSpecificError(
                    static_cast<int32_t>(Result::INVALID_ARGUMENT));
                    static_cast<int32_t>(Result::INVALID_ARGUMENT));
        }
        }
        mIptvTransportDescription = transport_desc;


        // create a streamer and open it for reading data
        // create a streamer and open it for reading data
        dtv_streamer* streamer = mIptvPluginInterface->create_streamer();
        mIptvPluginStreamer =
        mIptvPluginStreamer = streamer;
                createIptvPluginStreamer(mIptvPluginInterface, mIptvTransportDescription.c_str());
        int open_fd = mIptvPluginInterface->open_stream(streamer, transport_desc.c_str());

        if (open_fd < 0) {
        void* buf = malloc(sizeof(char) * TUNE_BUFFER_SIZE);
            ALOGE("[   INFO   ] could not open stream");
        if (buf == nullptr) {
            ALOGE("Failed to allocate 1 byte buffer for tuning.");
            return ::ndk::ScopedAStatus::fromServiceSpecificError(
            return ::ndk::ScopedAStatus::fromServiceSpecificError(
                    static_cast<int32_t>(Result::INVALID_ARGUMENT));
                    static_cast<int32_t>(Result::INVALID_STATE));
        }
        mIptvFrontendTuneThread = std::thread(&Frontend::readTuneByte, this, buf);
        if (mIptvFrontendTuneThread.joinable()) {
            mIptvFrontendTuneThread.join();
        }
        }
        ALOGI("[   INFO   ] open_stream successful, open_fd=%d", open_fd);

        size_t buf_size = 1;
        int timeout_ms = 2000;
        void* buf = malloc(sizeof(char) * buf_size);
        if (buf == nullptr) ALOGI("malloc buf failed [TUNE]");
        ALOGI("[   INFO   ] [Tune] Allocated buffer of size %zu", buf_size);
        mIptvFrontendTuneThread =
                std::thread(&Frontend::readTuneByte, this, streamer, buf, buf_size, timeout_ms);
        if (mIptvFrontendTuneThread.joinable()) mIptvFrontendTuneThread.join();
        free(buf);
    }
    }


    return ::ndk::ScopedAStatus::ok();
    return ::ndk::ScopedAStatus::ok();
+8 −1
Original line number Original line Diff line number Diff line
@@ -33,6 +33,9 @@ namespace tuner {


class Tuner;
class Tuner;


const int TUNE_BUFFER_SIZE = 1;        // byte
const int TUNE_BUFFER_TIMEOUT = 2000;  // ms

class Frontend : public BnFrontend {
class Frontend : public BnFrontend {
  public:
  public:
    Frontend(FrontendType type, int32_t id);
    Frontend(FrontendType type, int32_t id);
@@ -64,7 +67,10 @@ class Frontend : public BnFrontend {
    dtv_plugin* getIptvPluginInterface();
    dtv_plugin* getIptvPluginInterface();
    string getIptvTransportDescription();
    string getIptvTransportDescription();
    dtv_streamer* getIptvPluginStreamer();
    dtv_streamer* getIptvPluginStreamer();
    void readTuneByte(dtv_streamer* streamer, void* buf, size_t size, int timeout_ms);
    void readTuneByte(void* buf);
    void* getTuneByteBuffer() { return mTuneByteBuffer; };
    dtv_streamer* createIptvPluginStreamer(dtv_plugin* interface, const char* transport_desc);
    dtv_plugin* createIptvPluginInterface();
    bool isLocked();
    bool isLocked();
    void getFrontendInfo(FrontendInfo* _aidl_return);
    void getFrontendInfo(FrontendInfo* _aidl_return);
    void setTunerService(std::shared_ptr<Tuner> tuner);
    void setTunerService(std::shared_ptr<Tuner> tuner);
@@ -90,6 +96,7 @@ class Frontend : public BnFrontend {
    string mIptvTransportDescription;
    string mIptvTransportDescription;
    dtv_streamer* mIptvPluginStreamer;
    dtv_streamer* mIptvPluginStreamer;
    std::thread mIptvFrontendTuneThread;
    std::thread mIptvFrontendTuneThread;
    void* mTuneByteBuffer = nullptr;
};
};


}  // namespace tuner
}  // namespace tuner
Loading