Donate to e Foundation | Murena handsets with /e/OS | Own a part of Murena! Learn more

Commit e7f0fab8 authored by TreeHugger Robot's avatar TreeHugger Robot Committed by Android (Google) Code Review
Browse files

Merge "codec2: fix SimpleC2Component race condition"

parents d4abbab1 e3534403
Loading
Loading
Loading
Loading
+72 −18
Original line number Diff line number Diff line
@@ -51,6 +51,8 @@ void SimpleC2Component::WorkQueue::markDrain(uint32_t drainMode) {
    mQueue.push_back({ nullptr, drainMode });
}

////////////////////////////////////////////////////////////////////////////////

SimpleC2Component::SimpleC2Component(
        const std::shared_ptr<C2ComponentInterface> &intf)
    : mIntf(intf) {
@@ -96,8 +98,8 @@ c2_status_t SimpleC2Component::announce_nb(const std::vector<C2WorkOutline> &ite
}

c2_status_t SimpleC2Component::flush_sm(
        flush_mode_t flushThrough, std::list<std::unique_ptr<C2Work>>* const flushedWork) {
    (void) flushThrough;
        flush_mode_t flushMode, std::list<std::unique_ptr<C2Work>>* const flushedWork) {
    (void)flushMode;
    {
        Mutexed<ExecState>::Locked state(mExecState);
        if (state->mState != RUNNING) {
@@ -107,6 +109,7 @@ c2_status_t SimpleC2Component::flush_sm(
    {
        Mutexed<WorkQueue>::Locked queue(mWorkQueue);
        queue->incGeneration();
        // TODO: queue->splicedBy(flushedWork, flushedWork->end());
        while (!queue->empty()) {
            std::unique_ptr<C2Work> work = queue->pop_front();
            if (work) {
@@ -122,7 +125,7 @@ c2_status_t SimpleC2Component::flush_sm(
        }
    }

    return onFlush_sm();
    return C2_OK;
}

c2_status_t SimpleC2Component::drain_nb(drain_mode_t drainMode) {
@@ -160,6 +163,10 @@ c2_status_t SimpleC2Component::start() {
    }
    if (!state->mThread.joinable()) {
        mExitRequested = false;
        {
            Mutexed<ExitMonitor>::Locked monitor(mExitMonitor);
            monitor->mExited = false;
        }
        state->mThread = std::thread(
                [](std::weak_ptr<SimpleC2Component> wp) {
                    while (true) {
@@ -168,6 +175,8 @@ c2_status_t SimpleC2Component::start() {
                            return;
                        }
                        if (thiz->exitRequested()) {
                            ALOGV("stop processing");
                            thiz->signalExit();
                            return;
                        }
                        thiz->processQueue();
@@ -179,7 +188,42 @@ c2_status_t SimpleC2Component::start() {
    return C2_OK;
}

void SimpleC2Component::signalExit() {
    Mutexed<ExitMonitor>::Locked monitor(mExitMonitor);
    monitor->mExited = true;
    monitor->mCondition.broadcast();
}

void SimpleC2Component::requestExitAndWait(std::function<void()> job) {
    {
        Mutexed<ExecState>::Locked state(mExecState);
        if (!state->mThread.joinable()) {
            return;
        }
    }
    mExitRequested = true;
    {
        Mutexed<WorkQueue>::Locked queue(mWorkQueue);
        queue->mCondition.broadcast();
    }
    // TODO: timeout?
    {
        Mutexed<ExitMonitor>::Locked monitor(mExitMonitor);
        while (!monitor->mExited) {
            monitor.waitForCondition(monitor->mCondition);
        }
        job();
    }
    Mutexed<ExecState>::Locked state(mExecState);
    if (state->mThread.joinable()) {
        ALOGV("joining the processing thread");
        state->mThread.join();
        ALOGV("joined the processing thread");
    }
}

c2_status_t SimpleC2Component::stop() {
    ALOGV("stop");
    {
        Mutexed<ExecState>::Locked state(mExecState);
        if (state->mState != RUNNING) {
@@ -195,7 +239,8 @@ c2_status_t SimpleC2Component::stop() {
        Mutexed<PendingWork>::Locked pending(mPendingWork);
        pending->clear();
    }
    c2_status_t err = onStop();
    c2_status_t err;
    requestExitAndWait([this, &err]{ err = onStop(); });
    if (err != C2_OK) {
        return err;
    }
@@ -203,6 +248,7 @@ c2_status_t SimpleC2Component::stop() {
}

c2_status_t SimpleC2Component::reset() {
    ALOGV("reset");
    {
        Mutexed<ExecState>::Locked state(mExecState);
        state->mState = UNINITIALIZED;
@@ -215,21 +261,13 @@ c2_status_t SimpleC2Component::reset() {
        Mutexed<PendingWork>::Locked pending(mPendingWork);
        pending->clear();
    }
    onReset();
    requestExitAndWait([this]{ onReset(); });
    return C2_OK;
}

c2_status_t SimpleC2Component::release() {
    std::thread releasing;
    {
        Mutexed<ExecState>::Locked state(mExecState);
        releasing = std::move(state->mThread);
    }
    mExitRequested = true;
    if (releasing.joinable()) {
        releasing.join();
    }
    onRelease();
    ALOGV("release");
    requestExitAndWait([this]{ onRelease(); });
    return C2_OK;
}

@@ -271,10 +309,14 @@ void SimpleC2Component::processQueue() {
    std::unique_ptr<C2Work> work;
    uint64_t generation;
    int32_t drainMode;
    bool isFlushPending = false;
    {
        Mutexed<WorkQueue>::Locked queue(mWorkQueue);
        nsecs_t deadline = systemTime() + ms2ns(250);
        while (queue->empty()) {
            if (exitRequested()) {
                return;
            }
            nsecs_t now = systemTime();
            if (now >= deadline) {
                return;
@@ -287,8 +329,17 @@ void SimpleC2Component::processQueue() {

        generation = queue->generation();
        drainMode = queue->drainMode();
        isFlushPending = queue->popPendingFlush();
        work = queue->pop_front();
    }
    if (isFlushPending) {
        ALOGV("processing pending flush");
        c2_status_t err = onFlush_sm();
        if (err != C2_OK) {
            ALOGD("flush err: %d", err);
            // TODO: error
        }
    }

    if (!mOutputBlockPool) {
        c2_status_t err = [this] {
@@ -333,10 +384,12 @@ void SimpleC2Component::processQueue() {
    }

    process(work, mOutputBlockPool);
    ALOGV("processed frame #%" PRIu64, work->input.ordinal.frameIndex.peeku());
    {
        Mutexed<WorkQueue>::Locked queue(mWorkQueue);
        if (queue->generation() != generation) {
            ALOGW("work form old generation: was %" PRIu64 " now %" PRIu64, queue->generation(), generation);
            ALOGD("work form old generation: was %" PRIu64 " now %" PRIu64,
                    queue->generation(), generation);
            work->result = C2_NOT_FOUND;
            queue.unlock();
            {
@@ -364,6 +417,7 @@ void SimpleC2Component::processQueue() {
            (void)pending->insert({ frameIndex, std::move(work) });
        }
        if (unexpected) {
            ALOGD("unexpected pending work");
            unexpected->result = C2_CORRUPTED;
            Mutexed<ExecState>::Locked state(mExecState);
            state->mListener->onWorkDone_nb(shared_from_this(), vec(unexpected));
+18 −2
Original line number Diff line number Diff line
@@ -52,6 +52,7 @@ public:
    // for thread
    inline bool exitRequested() { return mExitRequested; }
    void processQueue();
    void signalExit();

protected:
    /**
@@ -157,16 +158,21 @@ private:

    class WorkQueue {
    public:
        inline WorkQueue() : mGeneration(0ul) {}
        inline WorkQueue() : mFlush(false), mGeneration(0ul) {}

        inline uint64_t generation() const { return mGeneration; }
        inline void incGeneration() { ++mGeneration; }
        inline void incGeneration() { ++mGeneration; mFlush = true; }

        std::unique_ptr<C2Work> pop_front();
        void push_back(std::unique_ptr<C2Work> work);
        bool empty() const;
        uint32_t drainMode() const;
        void markDrain(uint32_t drainMode);
        inline bool popPendingFlush() {
            bool flush = mFlush;
            mFlush = false;
            return flush;
        }
        void clear();

        Condition mCondition;
@@ -177,6 +183,7 @@ private:
            uint32_t drainMode;
        };

        bool mFlush;
        uint64_t mGeneration;
        std::list<Entry> mQueue;
    };
@@ -185,9 +192,18 @@ private:
    typedef std::unordered_map<uint64_t, std::unique_ptr<C2Work>> PendingWork;
    Mutexed<PendingWork> mPendingWork;

    struct ExitMonitor {
        inline ExitMonitor() : mExited(false) {}
        Condition mCondition;
        bool mExited;
    };
    Mutexed<ExitMonitor> mExitMonitor;

    std::shared_ptr<C2BlockPool> mOutputBlockPool;

    SimpleC2Component() = delete;

    void requestExitAndWait(std::function<void()> job);
};

}  // namespace android