Donate to e Foundation | Murena handsets with /e/OS | Own a part of Murena! Learn more

Commit 58bd5cf6 authored by Peter Kalauskas's avatar Peter Kalauskas
Browse files

Performance tests for chained flow operations

Also, remove common definitions for thread-builders list, moving
parameter definitions to the tests themselves to improve clarity.

Test: atest -c StructuredConcurrencyPerfTests --test-filter '.*FlowOperatorChainBenchmark'
Bug: 404377320
Flag: EXEMPT test
Change-Id: I33c320f13784d24bf11fec371ee3e9b41d48d350
parent e403e450
Loading
Loading
Loading
Loading
+19 −2
Original line number Diff line number Diff line
@@ -21,6 +21,9 @@ import androidx.benchmark.BlackHole
import androidx.benchmark.ExperimentalBlackHoleApi
import com.android.app.concurrent.benchmark.base.BaseConcurrentBenchmark
import com.android.app.concurrent.benchmark.base.BaseCoroutineBenchmark
import com.android.app.concurrent.benchmark.base.BaseCoroutineBenchmark.Companion.ExecutorThreadScopeBuilder
import com.android.app.concurrent.benchmark.base.BaseCoroutineBenchmark.Companion.HandlerThreadImmediateScopeBuilder
import com.android.app.concurrent.benchmark.base.BaseCoroutineBenchmark.Companion.HandlerThreadScopeBuilder
import com.android.app.concurrent.benchmark.base.BaseExecutorBenchmark
import com.android.app.concurrent.benchmark.base.BaseExecutorBenchmark.Companion.ExecutorThreadBuilder
import com.android.app.concurrent.benchmark.base.BaseExecutorBenchmark.Companion.HandlerThreadBuilder
@@ -201,7 +204,14 @@ class LaunchCoroutineBaselineBenchmark(param: ThreadFactory<Any, CoroutineScope>
    BaseCoroutineBenchmark(param) {

    companion object {
        @Parameters(name = "{0}") @JvmStatic fun getDispatchers() = threadBuilders
        @Parameters(name = "{0}")
        @JvmStatic
        fun getDispatchers() =
            listOf(
                ExecutorThreadScopeBuilder,
                HandlerThreadScopeBuilder,
                HandlerThreadImmediateScopeBuilder,
            )
    }

    @Test
@@ -226,7 +236,14 @@ class MutableStateFlowBaselineBenchmark(param: ThreadFactory<Any, CoroutineScope
    BaseCoroutineBenchmark(param) {

    companion object {
        @Parameters(name = "{0}") @JvmStatic fun getDispatchers() = threadBuilders
        @Parameters(name = "{0}")
        @JvmStatic
        fun getDispatchers() =
            listOf(
                ExecutorThreadScopeBuilder,
                HandlerThreadScopeBuilder,
                HandlerThreadImmediateScopeBuilder,
            )
    }

    @Test
+10 −1
Original line number Diff line number Diff line
@@ -19,6 +19,8 @@ package com.android.app.concurrent.benchmark

import com.android.app.concurrent.benchmark.base.BaseCoroutineBenchmark
import com.android.app.concurrent.benchmark.base.BaseCoroutineBenchmark.Companion.ExecutorThreadScopeBuilder
import com.android.app.concurrent.benchmark.base.BaseCoroutineBenchmark.Companion.HandlerThreadImmediateScopeBuilder
import com.android.app.concurrent.benchmark.base.BaseCoroutineBenchmark.Companion.HandlerThreadScopeBuilder
import com.android.app.concurrent.benchmark.base.StateCollectBenchmark
import com.android.app.concurrent.benchmark.base.StateCombineBenchmark
import com.android.app.concurrent.benchmark.base.times
@@ -42,7 +44,14 @@ class KairosStateCombineBenchmark(threadParam: ThreadFactory<Any, CoroutineScope
    BaseKairosStateBenchmark(threadParam), StateCombineBenchmark {

    companion object {
        @Parameters(name = "{0}") @JvmStatic fun getDispatchers() = threadBuilders
        @Parameters(name = "{0}")
        @JvmStatic
        fun getDispatchers() =
            listOf(
                ExecutorThreadScopeBuilder,
                HandlerThreadScopeBuilder,
                HandlerThreadImmediateScopeBuilder,
            )
    }
}

+75 −59
Original line number Diff line number Diff line
@@ -19,6 +19,8 @@ import androidx.benchmark.BlackHole
import androidx.benchmark.ExperimentalBlackHoleApi
import com.android.app.concurrent.benchmark.base.BaseCoroutineBenchmark
import com.android.app.concurrent.benchmark.base.BaseCoroutineBenchmark.Companion.ExecutorThreadScopeBuilder
import com.android.app.concurrent.benchmark.base.BaseCoroutineBenchmark.Companion.HandlerThreadImmediateScopeBuilder
import com.android.app.concurrent.benchmark.base.BaseCoroutineBenchmark.Companion.HandlerThreadScopeBuilder
import com.android.app.concurrent.benchmark.base.BaseCoroutineBenchmark.Companion.UnconfinedExecutorThreadScopeBuilder
import com.android.app.concurrent.benchmark.base.BaseCoroutineBenchmark.Companion.UnsafeImmediateThreadScopeBuilder
import com.android.app.concurrent.benchmark.base.ChainedStateCollectBenchmark
@@ -31,11 +33,15 @@ import com.android.app.concurrent.benchmark.builder.MutableStateFlowBuilder
import com.android.app.concurrent.benchmark.builder.StateBuilder
import com.android.app.concurrent.benchmark.util.ThreadFactory
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.ExperimentalCoroutinesApi
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.MutableStateFlow
import kotlinx.coroutines.flow.SharingStarted
import kotlinx.coroutines.flow.buffer
import kotlinx.coroutines.flow.conflate
import kotlinx.coroutines.flow.distinctUntilChanged
import kotlinx.coroutines.flow.flatMapLatest
import kotlinx.coroutines.flow.flow
import kotlinx.coroutines.flow.map
import kotlinx.coroutines.flow.stateIn
import kotlinx.coroutines.launch
@@ -52,7 +58,14 @@ class MutableStateFlowCombineBenchmark(param: ThreadFactory<Any, CoroutineScope>
    BaseMutableStateFlowBenchmark(param), StateCombineBenchmark {

    companion object {
        @Parameters(name = "{0}") @JvmStatic fun getDispatchers() = threadBuilders
        @Parameters(name = "{0}")
        @JvmStatic
        fun getDispatchers() =
            listOf(
                ExecutorThreadScopeBuilder,
                HandlerThreadScopeBuilder,
                HandlerThreadImmediateScopeBuilder,
            )
    }
}

@@ -107,71 +120,69 @@ class MutableStateFlowUnconfinedBenchmark(
    }
}

private fun <T1, T2> flowOpParam(
    name: String,
    block: (Flow<T1>, Int, CoroutineScope) -> Flow<T2>,
): (Flow<T1>, Int, CoroutineScope) -> Flow<T2> {
    return object : (Flow<T1>, Int, CoroutineScope) -> Flow<T2> {
        override fun invoke(upstream: Flow<T1>, index: Int, scope: CoroutineScope): Flow<T2> {
            return block(upstream, index, scope)
        }

        override fun toString(): String {
            return name
        }
    }
}

@RunWith(Parameterized::class)
@FixMethodOrder(MethodSorters.NAME_ASCENDING)
class ChainedFlowBenchmark(
class FlowOperatorChainBenchmark(
    threadParam: ThreadFactory<Any, CoroutineScope>,
    val chainLength: Int,
    val intermediateOperator: (CoroutineScope, Flow<Double>) -> Flow<Double>,
    val intermediateOperator: (Flow<Int>, Int, CoroutineScope) -> Flow<Int>,
) : BaseCoroutineBenchmark(threadParam) {

    companion object {
        @OptIn(ExperimentalCoroutinesApi::class)
        @Parameters(name = "{0},{1},{2}")
        @JvmStatic
        fun getDispatchers() =
            listOf(ExecutorThreadScopeBuilder) *
                listOf(1, 2, 5, 10, 25) *
            listOf(
                    object : (CoroutineScope, Flow<Double>) -> Flow<Double> {
                        override fun invoke(
                            scope: CoroutineScope,
                            upstream: Flow<Double>,
                        ): Flow<Double> {
                            return upstream
                        }

                        override fun toString(): String {
                            return "cold"
                        }
                    },
                    object : (CoroutineScope, Flow<Double>) -> Flow<Double> {
                        override fun invoke(
                            scope: CoroutineScope,
                            upstream: Flow<Double>,
                        ): Flow<Double> {
                            return upstream.stateIn(
                ExecutorThreadScopeBuilder,
                HandlerThreadScopeBuilder,
                HandlerThreadImmediateScopeBuilder,
            ) *
                listOf(5, 10, 25) *
                listOf(
                    flowOpParam("cold") { upstream, _, _ -> upstream },
                    flowOpParam("stateIn") { upstream, index, scope ->
                        upstream.stateIn(
                            scope,
                            started = SharingStarted.Eagerly,
                                initialValue = 0.0,
                            initialValue = index,
                        )
                        }

                        override fun toString(): String {
                            return "stateIn"
                        }
                    },
                    object : (CoroutineScope, Flow<Double>) -> Flow<Double> {
                        override fun invoke(
                            scope: CoroutineScope,
                            upstream: Flow<Double>,
                        ): Flow<Double> {
                            return upstream.conflate()
                        }

                        override fun toString(): String {
                            return "conflate"
                        }
                    flowOpParam("conflate") { upstream, _, _ -> upstream.conflate() },
                    flowOpParam("buffer-2") { upstream, _, _ -> upstream.buffer(2) },
                    flowOpParam("buffer-4") { upstream, _, _ -> upstream.buffer(4) },
                    flowOpParam("distinctUntilChanged") { upstream, _, _ ->
                        upstream.distinctUntilChanged()
                    },
                    flowOpParam("flatMapLatest-cold") { upstream, _, _ ->
                        upstream.flatMapLatest { value -> flow { emit(value) } }
                    },
                    object : (CoroutineScope, Flow<Double>) -> Flow<Double> {
                        override fun invoke(
                            scope: CoroutineScope,
                            upstream: Flow<Double>,
                        ): Flow<Double> {
                            return upstream.buffer(2)
                    flowOpParam<Int, Int>("flatMapLatest-state") { upstream, _, _ ->
                        val odds = MutableStateFlow(0)
                        val evens = MutableStateFlow(0)
                        upstream.flatMapLatest { value ->
                            if (value % 2 == 0) {
                                evens.value = value
                                evens
                            } else {
                                odds.value = value
                                odds
                            }

                        override fun toString(): String {
                            return "buffer-2"
                        }
                    },
                )
@@ -180,23 +191,28 @@ class ChainedFlowBenchmark(
    @OptIn(ExperimentalBlackHoleApi::class)
    @Test
    fun benchmark() {
        val sourceState = MutableStateFlow(0.0)
        var receivedVal = 0.0
        val stateChain = mutableListOf<Flow<Double>>()
        val sourceState = MutableStateFlow(0)
        var receivedVal = 0
        val flowChain = mutableListOf<Flow<Int>>()
        repeat(chainLength) { i ->
            val upstream = if (i == 0) sourceState else stateChain.last()
            stateChain.add(intermediateOperator(bgScope, upstream.map { it + 1 }))
            val upstream = if (i == 0) sourceState else flowChain.last()
            flowChain.add(intermediateOperator(upstream.map { it + 1 }, i, bgScope))
        }
        benchmarkRule.runBenchmark {
            beforeFirstIteration(count = 1) { barrier ->
                bgScope.launch {
                    stateChain.last().collect {
                    flowChain.last().collect {
                        receivedVal = it
                        barrier.countDown()
                    }
                }
            }
            mainBlock { n -> sourceState.value = n.toDouble() }
            mainBlock { n -> sourceState.value = n }
            stateChecker(
                isInExpectedState = { n -> receivedVal == n + chainLength },
                expectedStr = "receivedVal == n + chainLength",
                expectedCalc = { n -> "$receivedVal == $n + $chainLength" },
            )
            @OptIn(ExperimentalBlackHoleApi::class)
            afterLastIteration { BlackHole.consume(receivedVal) }
        }
+6 −1
Original line number Diff line number Diff line
@@ -17,6 +17,8 @@ package com.android.app.concurrent.benchmark

import com.android.app.concurrent.benchmark.base.BaseExecutorBenchmark
import com.android.app.concurrent.benchmark.base.BaseExecutorBenchmark.Companion.ExecutorThreadBuilder
import com.android.app.concurrent.benchmark.base.BaseExecutorBenchmark.Companion.HandlerImmediateThreadBuilder
import com.android.app.concurrent.benchmark.base.BaseExecutorBenchmark.Companion.HandlerThreadBuilder
import com.android.app.concurrent.benchmark.base.BaseExecutorBenchmark.Companion.UnconfinedThreadBuilder
import com.android.app.concurrent.benchmark.base.ChainedStateCollectBenchmark
import com.android.app.concurrent.benchmark.base.StateCollectBenchmark
@@ -40,7 +42,10 @@ class SimpleStateHolderCombineBenchmark(param: ThreadFactory<Any, Executor>) :
    BaseSimpleStateHolderBenchmark(param), StateCombineBenchmark {

    companion object {
        @Parameters(name = "{0}") @JvmStatic fun getDispatchers() = threadBuilders
        @Parameters(name = "{0}")
        @JvmStatic
        fun getDispatchers() =
            listOf(ExecutorThreadBuilder, HandlerThreadBuilder, HandlerImmediateThreadBuilder)
    }
}

+0 −7
Original line number Diff line number Diff line
@@ -112,13 +112,6 @@ abstract class BaseCoroutineBenchmark(val threadParam: ThreadFactory<Any, Corout
                quitScheduler = { scope -> scope.cancel() },
                stopThread = {},
            )

        val threadBuilders =
            listOf(
                HandlerThreadScopeBuilder,
                HandlerThreadImmediateScopeBuilder,
                ExecutorThreadScopeBuilder,
            )
    }

    protected lateinit var bgScope: CoroutineScope
Loading