Donate to e Foundation | Murena handsets with /e/OS | Own a part of Murena! Learn more

Commit ac6c7708 authored by Treehugger Robot's avatar Treehugger Robot Committed by Android (Google) Code Review
Browse files

Merge "Performance tests for chained flow operations" into main

parents 5b3df579 58bd5cf6
Loading
Loading
Loading
Loading
+19 −2
Original line number Diff line number Diff line
@@ -21,6 +21,9 @@ import androidx.benchmark.BlackHole
import androidx.benchmark.ExperimentalBlackHoleApi
import com.android.app.concurrent.benchmark.base.BaseConcurrentBenchmark
import com.android.app.concurrent.benchmark.base.BaseCoroutineBenchmark
import com.android.app.concurrent.benchmark.base.BaseCoroutineBenchmark.Companion.ExecutorThreadScopeBuilder
import com.android.app.concurrent.benchmark.base.BaseCoroutineBenchmark.Companion.HandlerThreadImmediateScopeBuilder
import com.android.app.concurrent.benchmark.base.BaseCoroutineBenchmark.Companion.HandlerThreadScopeBuilder
import com.android.app.concurrent.benchmark.base.BaseExecutorBenchmark
import com.android.app.concurrent.benchmark.base.BaseExecutorBenchmark.Companion.ExecutorThreadBuilder
import com.android.app.concurrent.benchmark.base.BaseExecutorBenchmark.Companion.HandlerThreadBuilder
@@ -201,7 +204,14 @@ class LaunchCoroutineBaselineBenchmark(param: ThreadFactory<Any, CoroutineScope>
    BaseCoroutineBenchmark(param) {

    companion object {
        @Parameters(name = "{0}") @JvmStatic fun getDispatchers() = threadBuilders
        @Parameters(name = "{0}")
        @JvmStatic
        fun getDispatchers() =
            listOf(
                ExecutorThreadScopeBuilder,
                HandlerThreadScopeBuilder,
                HandlerThreadImmediateScopeBuilder,
            )
    }

    @Test
@@ -226,7 +236,14 @@ class MutableStateFlowBaselineBenchmark(param: ThreadFactory<Any, CoroutineScope
    BaseCoroutineBenchmark(param) {

    companion object {
        @Parameters(name = "{0}") @JvmStatic fun getDispatchers() = threadBuilders
        @Parameters(name = "{0}")
        @JvmStatic
        fun getDispatchers() =
            listOf(
                ExecutorThreadScopeBuilder,
                HandlerThreadScopeBuilder,
                HandlerThreadImmediateScopeBuilder,
            )
    }

    @Test
+10 −1
Original line number Diff line number Diff line
@@ -19,6 +19,8 @@ package com.android.app.concurrent.benchmark

import com.android.app.concurrent.benchmark.base.BaseCoroutineBenchmark
import com.android.app.concurrent.benchmark.base.BaseCoroutineBenchmark.Companion.ExecutorThreadScopeBuilder
import com.android.app.concurrent.benchmark.base.BaseCoroutineBenchmark.Companion.HandlerThreadImmediateScopeBuilder
import com.android.app.concurrent.benchmark.base.BaseCoroutineBenchmark.Companion.HandlerThreadScopeBuilder
import com.android.app.concurrent.benchmark.base.StateCollectBenchmark
import com.android.app.concurrent.benchmark.base.StateCombineBenchmark
import com.android.app.concurrent.benchmark.base.times
@@ -42,7 +44,14 @@ class KairosStateCombineBenchmark(threadParam: ThreadFactory<Any, CoroutineScope
    BaseKairosStateBenchmark(threadParam), StateCombineBenchmark {

    companion object {
        @Parameters(name = "{0}") @JvmStatic fun getDispatchers() = threadBuilders
        @Parameters(name = "{0}")
        @JvmStatic
        fun getDispatchers() =
            listOf(
                ExecutorThreadScopeBuilder,
                HandlerThreadScopeBuilder,
                HandlerThreadImmediateScopeBuilder,
            )
    }
}

+75 −59
Original line number Diff line number Diff line
@@ -19,6 +19,8 @@ import androidx.benchmark.BlackHole
import androidx.benchmark.ExperimentalBlackHoleApi
import com.android.app.concurrent.benchmark.base.BaseCoroutineBenchmark
import com.android.app.concurrent.benchmark.base.BaseCoroutineBenchmark.Companion.ExecutorThreadScopeBuilder
import com.android.app.concurrent.benchmark.base.BaseCoroutineBenchmark.Companion.HandlerThreadImmediateScopeBuilder
import com.android.app.concurrent.benchmark.base.BaseCoroutineBenchmark.Companion.HandlerThreadScopeBuilder
import com.android.app.concurrent.benchmark.base.BaseCoroutineBenchmark.Companion.UnconfinedExecutorThreadScopeBuilder
import com.android.app.concurrent.benchmark.base.BaseCoroutineBenchmark.Companion.UnsafeImmediateThreadScopeBuilder
import com.android.app.concurrent.benchmark.base.ChainedStateCollectBenchmark
@@ -31,11 +33,15 @@ import com.android.app.concurrent.benchmark.builder.MutableStateFlowBuilder
import com.android.app.concurrent.benchmark.builder.StateBuilder
import com.android.app.concurrent.benchmark.util.ThreadFactory
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.ExperimentalCoroutinesApi
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.MutableStateFlow
import kotlinx.coroutines.flow.SharingStarted
import kotlinx.coroutines.flow.buffer
import kotlinx.coroutines.flow.conflate
import kotlinx.coroutines.flow.distinctUntilChanged
import kotlinx.coroutines.flow.flatMapLatest
import kotlinx.coroutines.flow.flow
import kotlinx.coroutines.flow.map
import kotlinx.coroutines.flow.stateIn
import kotlinx.coroutines.launch
@@ -52,7 +58,14 @@ class MutableStateFlowCombineBenchmark(param: ThreadFactory<Any, CoroutineScope>
    BaseMutableStateFlowBenchmark(param), StateCombineBenchmark {

    companion object {
        @Parameters(name = "{0}") @JvmStatic fun getDispatchers() = threadBuilders
        @Parameters(name = "{0}")
        @JvmStatic
        fun getDispatchers() =
            listOf(
                ExecutorThreadScopeBuilder,
                HandlerThreadScopeBuilder,
                HandlerThreadImmediateScopeBuilder,
            )
    }
}

@@ -107,71 +120,69 @@ class MutableStateFlowUnconfinedBenchmark(
    }
}

private fun <T1, T2> flowOpParam(
    name: String,
    block: (Flow<T1>, Int, CoroutineScope) -> Flow<T2>,
): (Flow<T1>, Int, CoroutineScope) -> Flow<T2> {
    return object : (Flow<T1>, Int, CoroutineScope) -> Flow<T2> {
        override fun invoke(upstream: Flow<T1>, index: Int, scope: CoroutineScope): Flow<T2> {
            return block(upstream, index, scope)
        }

        override fun toString(): String {
            return name
        }
    }
}

@RunWith(Parameterized::class)
@FixMethodOrder(MethodSorters.NAME_ASCENDING)
class ChainedFlowBenchmark(
class FlowOperatorChainBenchmark(
    threadParam: ThreadFactory<Any, CoroutineScope>,
    val chainLength: Int,
    val intermediateOperator: (CoroutineScope, Flow<Double>) -> Flow<Double>,
    val intermediateOperator: (Flow<Int>, Int, CoroutineScope) -> Flow<Int>,
) : BaseCoroutineBenchmark(threadParam) {

    companion object {
        @OptIn(ExperimentalCoroutinesApi::class)
        @Parameters(name = "{0},{1},{2}")
        @JvmStatic
        fun getDispatchers() =
            listOf(ExecutorThreadScopeBuilder) *
                listOf(1, 2, 5, 10, 25) *
            listOf(
                    object : (CoroutineScope, Flow<Double>) -> Flow<Double> {
                        override fun invoke(
                            scope: CoroutineScope,
                            upstream: Flow<Double>,
                        ): Flow<Double> {
                            return upstream
                        }

                        override fun toString(): String {
                            return "cold"
                        }
                    },
                    object : (CoroutineScope, Flow<Double>) -> Flow<Double> {
                        override fun invoke(
                            scope: CoroutineScope,
                            upstream: Flow<Double>,
                        ): Flow<Double> {
                            return upstream.stateIn(
                ExecutorThreadScopeBuilder,
                HandlerThreadScopeBuilder,
                HandlerThreadImmediateScopeBuilder,
            ) *
                listOf(5, 10, 25) *
                listOf(
                    flowOpParam("cold") { upstream, _, _ -> upstream },
                    flowOpParam("stateIn") { upstream, index, scope ->
                        upstream.stateIn(
                            scope,
                            started = SharingStarted.Eagerly,
                                initialValue = 0.0,
                            initialValue = index,
                        )
                        }

                        override fun toString(): String {
                            return "stateIn"
                        }
                    },
                    object : (CoroutineScope, Flow<Double>) -> Flow<Double> {
                        override fun invoke(
                            scope: CoroutineScope,
                            upstream: Flow<Double>,
                        ): Flow<Double> {
                            return upstream.conflate()
                        }

                        override fun toString(): String {
                            return "conflate"
                        }
                    flowOpParam("conflate") { upstream, _, _ -> upstream.conflate() },
                    flowOpParam("buffer-2") { upstream, _, _ -> upstream.buffer(2) },
                    flowOpParam("buffer-4") { upstream, _, _ -> upstream.buffer(4) },
                    flowOpParam("distinctUntilChanged") { upstream, _, _ ->
                        upstream.distinctUntilChanged()
                    },
                    flowOpParam("flatMapLatest-cold") { upstream, _, _ ->
                        upstream.flatMapLatest { value -> flow { emit(value) } }
                    },
                    object : (CoroutineScope, Flow<Double>) -> Flow<Double> {
                        override fun invoke(
                            scope: CoroutineScope,
                            upstream: Flow<Double>,
                        ): Flow<Double> {
                            return upstream.buffer(2)
                    flowOpParam<Int, Int>("flatMapLatest-state") { upstream, _, _ ->
                        val odds = MutableStateFlow(0)
                        val evens = MutableStateFlow(0)
                        upstream.flatMapLatest { value ->
                            if (value % 2 == 0) {
                                evens.value = value
                                evens
                            } else {
                                odds.value = value
                                odds
                            }

                        override fun toString(): String {
                            return "buffer-2"
                        }
                    },
                )
@@ -180,23 +191,28 @@ class ChainedFlowBenchmark(
    @OptIn(ExperimentalBlackHoleApi::class)
    @Test
    fun benchmark() {
        val sourceState = MutableStateFlow(0.0)
        var receivedVal = 0.0
        val stateChain = mutableListOf<Flow<Double>>()
        val sourceState = MutableStateFlow(0)
        var receivedVal = 0
        val flowChain = mutableListOf<Flow<Int>>()
        repeat(chainLength) { i ->
            val upstream = if (i == 0) sourceState else stateChain.last()
            stateChain.add(intermediateOperator(bgScope, upstream.map { it + 1 }))
            val upstream = if (i == 0) sourceState else flowChain.last()
            flowChain.add(intermediateOperator(upstream.map { it + 1 }, i, bgScope))
        }
        benchmarkRule.runBenchmark {
            beforeFirstIteration(count = 1) { barrier ->
                bgScope.launch {
                    stateChain.last().collect {
                    flowChain.last().collect {
                        receivedVal = it
                        barrier.countDown()
                    }
                }
            }
            mainBlock { n -> sourceState.value = n.toDouble() }
            mainBlock { n -> sourceState.value = n }
            stateChecker(
                isInExpectedState = { n -> receivedVal == n + chainLength },
                expectedStr = "receivedVal == n + chainLength",
                expectedCalc = { n -> "$receivedVal == $n + $chainLength" },
            )
            @OptIn(ExperimentalBlackHoleApi::class)
            afterLastIteration { BlackHole.consume(receivedVal) }
        }
+6 −1
Original line number Diff line number Diff line
@@ -17,6 +17,8 @@ package com.android.app.concurrent.benchmark

import com.android.app.concurrent.benchmark.base.BaseExecutorBenchmark
import com.android.app.concurrent.benchmark.base.BaseExecutorBenchmark.Companion.ExecutorThreadBuilder
import com.android.app.concurrent.benchmark.base.BaseExecutorBenchmark.Companion.HandlerImmediateThreadBuilder
import com.android.app.concurrent.benchmark.base.BaseExecutorBenchmark.Companion.HandlerThreadBuilder
import com.android.app.concurrent.benchmark.base.BaseExecutorBenchmark.Companion.UnconfinedThreadBuilder
import com.android.app.concurrent.benchmark.base.ChainedStateCollectBenchmark
import com.android.app.concurrent.benchmark.base.StateCollectBenchmark
@@ -40,7 +42,10 @@ class SimpleStateHolderCombineBenchmark(param: ThreadFactory<Any, Executor>) :
    BaseSimpleStateHolderBenchmark(param), StateCombineBenchmark {

    companion object {
        @Parameters(name = "{0}") @JvmStatic fun getDispatchers() = threadBuilders
        @Parameters(name = "{0}")
        @JvmStatic
        fun getDispatchers() =
            listOf(ExecutorThreadBuilder, HandlerThreadBuilder, HandlerImmediateThreadBuilder)
    }
}

+0 −7
Original line number Diff line number Diff line
@@ -112,13 +112,6 @@ abstract class BaseCoroutineBenchmark(val threadParam: ThreadFactory<Any, Corout
                quitScheduler = { scope -> scope.cancel() },
                stopThread = {},
            )

        val threadBuilders =
            listOf(
                HandlerThreadScopeBuilder,
                HandlerThreadImmediateScopeBuilder,
                ExecutorThreadScopeBuilder,
            )
    }

    protected lateinit var bgScope: CoroutineScope
Loading