Donate to e Foundation | Murena handsets with /e/OS | Own a part of Murena! Learn more

Commit 6280ac14 authored by Orhan Uysal's avatar Orhan Uysal Committed by Android (Google) Code Review
Browse files

Merge "Add WorkSerializer class to WMShell" into main

parents 9843d1c3 fa09240f
Loading
Loading
Loading
Loading
+91 −0
Original line number Diff line number Diff line
/*
 * Copyright (C) 2025 The Android Open Source Project
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *      http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package com.android.wm.shell.common

import com.android.internal.protolog.ProtoLog
import com.android.wm.shell.protolog.ShellProtoLogGroup.WM_SHELL
import kotlin.coroutines.cancellation.CancellationException
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.channels.BufferOverflow
import kotlinx.coroutines.channels.Channel
import kotlinx.coroutines.channels.ChannelResult
import kotlinx.coroutines.launch

/**
 * A queue that executes coroutines sequentially in a First-In, First-Out (FIFO) order.
 *
 * @param scope The [CoroutineScope] the queue will use to launch its worker. Cancelling this scope
 * will terminate the queue.
 */
class WorkSerializer(
    scope: CoroutineScope,
    capacity: Int = Channel.Factory.UNLIMITED,
    overflowStrategy: BufferOverflow = BufferOverflow.SUSPEND,
) {
    private val channel =
        Channel<suspend () -> Unit>(
            capacity = capacity,
            onBufferOverflow = overflowStrategy,
            onUndeliveredElement = { onUndeliveredElement() })

    init {
        // The single worker coroutine that processes the queue.
        scope.launch {
            for (work in channel) {
                try {
                    work()
                } catch (e: CancellationException) {
                    ProtoLog.w(
                        WM_SHELL, "CoroutineQueue got cancelled %s",
                        e.printStackTrace()
                    )
                } catch (e: Throwable) {
                    ProtoLog.e(
                        WM_SHELL, "Error in CoroutineQueue %s",
                        e.printStackTrace()
                    )
                }
            }
        }
    }

    /**
     * Adds a new coroutine block to the queue to be executed.
     *
     * @param work The suspendable lambda to execute.
     */
    fun post(work: suspend () -> Unit): ChannelResult<Unit> {
        val result = channel.trySend(work)
        if (result.isFailure) {
            ProtoLog.w(
                WM_SHELL,
                "Failed to post work to CoroutineQueue %s",
                result.exceptionOrNull()?.stackTraceToString()
            )
        }
        return result
    }

    /**
     * Closes the queue to new work. Pending work will be completed.
     */
    fun close() = channel.close()

    fun onUndeliveredElement() {
        ProtoLog.w(WM_SHELL, "An element in CoroutineQueue was undelivered")
    }
}
 No newline at end of file
+126 −0
Original line number Diff line number Diff line
/*
 * Copyright (C) 2025 The Android Open Source Project
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *      http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package com.android.wm.shell.common

import android.testing.AndroidTestingRunner
import androidx.test.filters.SmallTest
import com.android.wm.shell.ShellTestCase
import com.google.common.truth.Truth.assertThat
import kotlinx.coroutines.ExperimentalCoroutinesApi
import kotlinx.coroutines.cancel
import kotlinx.coroutines.delay
import kotlinx.coroutines.test.TestScope
import kotlinx.coroutines.test.runTest
import org.junit.Test
import org.junit.runner.RunWith

@SmallTest
@RunWith(AndroidTestingRunner::class)
@ExperimentalCoroutinesApi
class WorkSerializerTest : ShellTestCase() {

    /**
     * Tests that jobs are executed sequentially in the order they are posted (FIFO),
     * regardless of their internal delays.
     */
    @Test
    fun queueMultipleJobs_executesSequentially() = runTest {
        val testScope = TestScope(testScheduler)
        val queue = WorkSerializer(testScope)
        val executionOrder = mutableListOf<Int>()

        // Post three jobs with out-of-order delays
        queue.post {
            delay(300)
            executionOrder.add(1)
        }
        queue.post {
            delay(100)
            executionOrder.add(2)
        }
        queue.post {
            delay(50)
            executionOrder.add(3)
        }

        // Advance the virtual clock until all coroutines are idle
        testScheduler.advanceUntilIdle() // Let all jobs complete

        // Verify the execution order is 1, 2, 3 as they were posted
        assertThat(executionOrder).containsExactly(1, 2, 3).inOrder()

        testScope.cancel()
    }

    /**
     * Tests that if a job throws an exception, the queue does not stop and
     * continues to process subsequent jobs.
     */
    @Test
    fun queueWithException_doesNotStopProcessing() = runTest {
        val testScope = TestScope(testScheduler)
        val queue = WorkSerializer(testScope)
        val executionOrder = mutableListOf<Int>()

        queue.post { executionOrder.add(1) }
        queue.post { throw IllegalStateException("Job failed!") }
        queue.post { executionOrder.add(3) }

        testScheduler.advanceUntilIdle()

        // Verify that the job after the failing one was still executed
        assertThat(executionOrder).containsExactly(1, 3).inOrder()

        testScope.cancel()
    }

    /**
     * Tests that after calling close(), the queue finishes its current work
     * but does not accept new submissions.
     */
    @Test
    fun queueAfterClose_doesNotAcceptNewJobs() = runTest {
        val testScope = TestScope(testScheduler)
        val queue = WorkSerializer(testScope)
        val executionOrder = mutableListOf<Int>()

        // Post initial jobs
        queue.post {
            delay(100)
            executionOrder.add(1)
        }
        queue.post {
            delay(100)
            executionOrder.add(2)
        }

        // Close the queue. No new jobs should be accepted after this.
        queue.close()

        // This job should be ignored because the channel is closed
        queue.post {
            executionOrder.add(3)
        }

        testScheduler.advanceUntilIdle()

        // Verify that only the jobs posted before close() were executed
        assertThat(executionOrder).containsExactly(1, 2).inOrder()

        testScope.cancel()
    }
}
 No newline at end of file