Donate to e Foundation | Murena handsets with /e/OS | Own a part of Murena! Learn more

Commit 880e5c0c authored by Treehugger Robot's avatar Treehugger Robot Committed by Android (Google) Code Review
Browse files

Merge "Compare complex flow perf to imperative logic" into main

parents 3d34b49c 16cb2de3
Loading
Loading
Loading
Loading
+5 −4
Original line number Original line Diff line number Diff line
@@ -21,19 +21,16 @@ java_defaults {
    name: "structured_concurrency_perf_tests_defaults",
    name: "structured_concurrency_perf_tests_defaults",
    defaults: [
    defaults: [
        "platform_app_defaults",
        "platform_app_defaults",
        "SystemUI_optimized_defaults",
    ],
    ],
    srcs: [
    srcs: [
        "src/**/*.kt",
        "src/**/*.kt",
    ],
    ],
    optimize: {
        proguard_flags_files: ["proguard.flags"],
    },
    static_libs: [
    static_libs: [
        "collector-device-lib",
        "collector-device-lib",
        "tracinglib-platform",
        "tracinglib-platform",
        "kotlinx_coroutines_android",
        "kotlinx_coroutines_android",
        "kairos",
        "kairos",
        "com.android.systemui.util.kotlin",
        "androidx.benchmark_benchmark-common",
        "androidx.benchmark_benchmark-common",
        "androidx.benchmark_benchmark-junit4",
        "androidx.benchmark_benchmark-junit4",
        "androidx.benchmark_benchmark-macro",
        "androidx.benchmark_benchmark-macro",
@@ -53,7 +50,11 @@ android_test {
    name: "StructuredConcurrencyPerfTests",
    name: "StructuredConcurrencyPerfTests",
    defaults: [
    defaults: [
        "structured_concurrency_perf_tests_defaults",
        "structured_concurrency_perf_tests_defaults",
        "SystemUI_optimized_defaults",
    ],
    ],
    optimize: {
        proguard_flags_files: ["proguard.flags"],
    },
    test_config: "configs/structured-concurrency-perf.xml",
    test_config: "configs/structured-concurrency-perf.xml",
    test_options: {
    test_options: {
        extra_test_configs: [
        extra_test_configs: [
+2 −0
Original line number Original line Diff line number Diff line
@@ -17,6 +17,8 @@ package com.android.app.concurrent.benchmark.util


const val DEBUG = true
const val DEBUG = true


const val VERBOSE_DEBUG = false

const val PERFETTO_CONFIG =
const val PERFETTO_CONFIG =
    """
    """
# Enable periodic flushing of the trace buffer into the output file.
# Enable periodic flushing of the trace buffer into the output file.
+145 −0
Original line number Original line Diff line number Diff line
/*
 * Copyright (C) 2025 The Android Open Source Project
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *      http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package com.android.app.concurrent.benchmark

import androidx.benchmark.BlackHole
import androidx.benchmark.ExperimentalBlackHoleApi
import com.android.app.concurrent.benchmark.base.ConcurrentBenchmarkRule
import com.android.app.concurrent.benchmark.event.BaseEventBenchmark
import com.android.app.concurrent.benchmark.event.EventContextProvider
import com.android.app.concurrent.benchmark.event.FlowWritableEventBuilder
import com.android.app.concurrent.benchmark.event.IntEventCombiner
import com.android.app.concurrent.benchmark.event.SimpleEvent
import com.android.app.concurrent.benchmark.event.SimpleWritableEventBuilder
import com.android.app.concurrent.benchmark.event.WritableEventFactory
import com.android.app.concurrent.benchmark.util.ExecutorServiceCoroutineScopeBuilder
import com.android.app.concurrent.benchmark.util.ExecutorThreadBuilder
import com.android.app.concurrent.benchmark.util.ThreadFactory
import com.android.app.concurrent.benchmark.util.dbg
import java.util.concurrent.Executor
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.flow.Flow
import org.junit.FixMethodOrder
import org.junit.Test
import org.junit.runner.RunWith
import org.junit.runners.MethodSorters
import org.junit.runners.Parameterized
import org.junit.runners.Parameterized.Parameters

@OptIn(ExperimentalBlackHoleApi::class)
private sealed interface ActivateEventBenchmark<T, E : Any>
    where T : WritableEventFactory<E>, T : EventContextProvider<E>, T : IntEventCombiner<E> {
    val benchmarkRule: ConcurrentBenchmarkRule
    val context: T

    @Test
    fun benchmark_activateEventBenchmark() {
        val numEvents = 4
        val sourceEvents = List(numEvents) { context.createWritableEvent(0) }
        val combinedEvents =
            context.combineIntEvents(sourceEvents) { values ->
                val sum = values.sum()
                dbg { "sum = $sum" }
                sum
            }
        val signal = context.createWritableEvent(0)
        var result = 0
        benchmarkRule.runBenchmark {
            withBarrier(count = numEvents + 1) {
                beforeFirstIteration { barrier ->
                    context.read {
                        signal.observe {
                            if (it == 0) {
                                // trigger the first iteration so the test can begin:
                                repeat(numEvents) {
                                    // in future iterations, this will be handled by the sourceEvent
                                    // observers that are registered on each loop
                                    barrier.countDown()
                                }
                            }
                            // will countDown() after combinedEvents reaches desired state
                            barrier.countDown()
                        }
                    }
                }
                lateinit var lastRead: AutoCloseable
                onEachIteration { n, barrier ->
                    lastRead =
                        context.read {
                            sourceEvents.forEach { sourceEvent ->
                                // will countDown() numEvents times
                                sourceEvent.observe { if (it == n) barrier.countDown() }
                            }
                            combinedEvents.observe {
                                dbg { "combineIntEvents it=$it" }
                                // Only countdown when the sum is a multiple of the number of source
                                // events, which indicates events were incremented as many times as
                                // there are `numEvents` (whether that was done in a random order or
                                // sequential).
                                if (it % numEvents == 0) {
                                    result = it
                                    context.write { signal.update(it) }
                                }
                            }
                        }
                }
                afterEachIteration { lastRead.close() }
            }

            onEachIteration { n ->
                context.write { sourceEvents.forEach { sourceEvent -> sourceEvent.update(n) } }
            }

            stateChecker(
                isInExpectedState = { n -> result == n * numEvents },
                expectedStr = "result == n * $numEvents",
                expectedCalc = { n -> "$result == $n * $numEvents" },
            )

            afterLastIteration { BlackHole.consume(result) }
        }
    }
}

@RunWith(Parameterized::class)
@FixMethodOrder(MethodSorters.NAME_ASCENDING)
class SimpleActivateEventBenchmark(param: ThreadFactory<Any, Executor>) :
    BaseEventBenchmark<Executor, SimpleWritableEventBuilder>(
        param,
        { SimpleWritableEventBuilder(it) },
    ),
    ActivateEventBenchmark<SimpleWritableEventBuilder, SimpleEvent<*>> {

    companion object {
        @Parameters(name = "{0}") @JvmStatic fun getDispatchers() = listOf(ExecutorThreadBuilder)
    }
}

@RunWith(Parameterized::class)
@FixMethodOrder(MethodSorters.NAME_ASCENDING)
class FlowActivateEventBenchmark(param: ThreadFactory<Any, CoroutineScope>) :
    BaseEventBenchmark<CoroutineScope, FlowWritableEventBuilder>(
        param,
        { FlowWritableEventBuilder(it) },
    ),
    ActivateEventBenchmark<FlowWritableEventBuilder, Flow<*>> {

    companion object {
        @Parameters(name = "{0}")
        @JvmStatic
        fun getDispatchers() = listOf(ExecutorServiceCoroutineScopeBuilder)
    }
}
+187 −105
Original line number Original line Diff line number Diff line
@@ -20,25 +20,33 @@ package com.android.app.concurrent.benchmark
import androidx.benchmark.BlackHole
import androidx.benchmark.BlackHole
import androidx.benchmark.ExperimentalBlackHoleApi
import androidx.benchmark.ExperimentalBlackHoleApi
import com.android.app.concurrent.benchmark.base.BaseConcurrentBenchmark
import com.android.app.concurrent.benchmark.base.BaseConcurrentBenchmark
import com.android.app.concurrent.benchmark.base.BaseCoroutineBenchmark
import com.android.app.concurrent.benchmark.base.BaseSchedulerBenchmark
import com.android.app.concurrent.benchmark.base.BaseCoroutineBenchmark.Companion.ExecutorThreadScopeBuilder
import com.android.app.concurrent.benchmark.base.ConcurrentBenchmarkRule
import com.android.app.concurrent.benchmark.base.BaseCoroutineBenchmark.Companion.HandlerThreadImmediateScopeBuilder
import com.android.app.concurrent.benchmark.event.BaseEventBenchmark
import com.android.app.concurrent.benchmark.base.BaseCoroutineBenchmark.Companion.HandlerThreadScopeBuilder
import com.android.app.concurrent.benchmark.event.EventContextProvider
import com.android.app.concurrent.benchmark.base.BaseExecutorBenchmark
import com.android.app.concurrent.benchmark.event.FlowWritableEventBuilder
import com.android.app.concurrent.benchmark.base.BaseExecutorBenchmark.Companion.ExecutorThreadBuilder
import com.android.app.concurrent.benchmark.event.SimpleEvent
import com.android.app.concurrent.benchmark.base.BaseExecutorBenchmark.Companion.HandlerThreadBuilder
import com.android.app.concurrent.benchmark.event.SimplePublisherImpl
import com.android.app.concurrent.benchmark.util.SimpleStateHolder
import com.android.app.concurrent.benchmark.event.SimpleState
import com.android.app.concurrent.benchmark.util.SimpleSynchronousState
import com.android.app.concurrent.benchmark.event.SimpleSynchronousState
import com.android.app.concurrent.benchmark.event.SimpleWritableEventBuilder
import com.android.app.concurrent.benchmark.event.WritableEventFactory
import com.android.app.concurrent.benchmark.event.asSuspendableObserver
import com.android.app.concurrent.benchmark.util.CyclicCountDownBarrier
import com.android.app.concurrent.benchmark.util.ExecutorServiceCoroutineScopeBuilder
import com.android.app.concurrent.benchmark.util.ExecutorThreadBuilder
import com.android.app.concurrent.benchmark.util.HandlerThreadBuilder
import com.android.app.concurrent.benchmark.util.HandlerThreadImmediateScopeBuilder
import com.android.app.concurrent.benchmark.util.HandlerThreadScopeBuilder
import com.android.app.concurrent.benchmark.util.ThreadFactory
import com.android.app.concurrent.benchmark.util.ThreadFactory
import com.android.app.concurrent.benchmark.util.asSuspendableObserver
import java.util.concurrent.Executor
import java.util.concurrent.Executor
import java.util.concurrent.ExecutorService
import kotlin.coroutines.Continuation
import kotlin.coroutines.Continuation
import kotlin.coroutines.EmptyCoroutineContext
import kotlin.coroutines.EmptyCoroutineContext
import kotlin.coroutines.resume
import kotlin.coroutines.resume
import kotlin.coroutines.startCoroutine
import kotlin.coroutines.startCoroutine
import kotlin.coroutines.suspendCoroutine
import kotlin.coroutines.suspendCoroutine
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.MutableStateFlow
import kotlinx.coroutines.flow.MutableStateFlow
import kotlinx.coroutines.launch
import kotlinx.coroutines.launch
import org.junit.FixMethodOrder
import org.junit.FixMethodOrder
@@ -56,7 +64,7 @@ class SingleThreadSumDoubleBaselineBenchmark() : BaseConcurrentBenchmark() {
    @Test
    @Test
    fun benchmark() {
    fun benchmark() {
        var sum = 0.0
        var sum = 0.0
        benchmarkRule.runBenchmark { mainBlock { n -> sum += n.toDouble() } }
        benchmarkRule.runBenchmark { onEachIteration { n -> sum += n.toDouble() } }
        BlackHole.consume(sum)
        BlackHole.consume(sum)
    }
    }
}
}
@@ -68,7 +76,7 @@ class SingleThreadSum1xDoMathBaselineBenchmark() : BaseConcurrentBenchmark() {
    @Test
    @Test
    fun benchmark() {
    fun benchmark() {
        var sum = 0.0
        var sum = 0.0
        benchmarkRule.runBenchmark { mainBlock { n -> sum += doMath(n) } }
        benchmarkRule.runBenchmark { onEachIteration { n -> sum += doMath(n) } }
        BlackHole.consume(sum)
        BlackHole.consume(sum)
    }
    }
}
}
@@ -80,14 +88,14 @@ class SingleThreadSum2xDoMathBaselineBenchmark() : BaseConcurrentBenchmark() {
    @Test
    @Test
    fun benchmark() {
    fun benchmark() {
        var sum = 0.0
        var sum = 0.0
        benchmarkRule.runBenchmark { mainBlock { n -> sum += doMath(doMath(n)) } }
        benchmarkRule.runBenchmark { onEachIteration { n -> sum += doMath(doMath(n)) } }
        BlackHole.consume(sum)
        BlackHole.consume(sum)
    }
    }
}
}


@RunWith(Parameterized::class)
@RunWith(Parameterized::class)
class ExecutorDispatchBaselineBenchmark(param: ThreadFactory<ExecutorService, Executor>) :
class ExecutorDispatchBaselineBenchmark(param: ThreadFactory<Any, Executor>) :
    BaseExecutorBenchmark(param) {
    BaseSchedulerBenchmark<Executor>(param) {


    companion object {
    companion object {
        @Parameters(name = "{0}")
        @Parameters(name = "{0}")
@@ -99,22 +107,25 @@ class ExecutorDispatchBaselineBenchmark(param: ThreadFactory<ExecutorService, Ex
    fun benchmark() {
    fun benchmark() {
        var sum = 0.0
        var sum = 0.0
        benchmarkRule.runBenchmark {
        benchmarkRule.runBenchmark {
            onEachIteration(count = 1) { n, barrier ->
            withBarrier(count = 1) {
                beforeFirstIteration { barrier -> scheduler.execute { barrier.countDown() } }
                onEachIteration { n, barrier ->
                    val next = doMath(n)
                    val next = doMath(n)
                executor.execute {
                    scheduler.execute {
                        sum += doMath(next)
                        sum += doMath(next)
                        barrier.countDown()
                        barrier.countDown()
                    }
                    }
                }
                }
            }
            }
        }
        BlackHole.consume(sum)
        BlackHole.consume(sum)
    }
    }
}
}


@RunWith(Parameterized::class)
@RunWith(Parameterized::class)
@FixMethodOrder(MethodSorters.NAME_ASCENDING)
@FixMethodOrder(MethodSorters.NAME_ASCENDING)
class StartIntrinsicCoroutineBaselineBenchmark(param: ThreadFactory<ExecutorService, Executor>) :
class StartIntrinsicCoroutineBaselineBenchmark(param: ThreadFactory<Any, Executor>) :
    BaseExecutorBenchmark(param) {
    BaseSchedulerBenchmark<Executor>(param) {


    companion object {
    companion object {
        @Parameters(name = "{0}")
        @Parameters(name = "{0}")
@@ -125,17 +136,20 @@ class StartIntrinsicCoroutineBaselineBenchmark(param: ThreadFactory<ExecutorServ
    @Test
    @Test
    fun benchmark() {
    fun benchmark() {
        var sum = 0.0
        var sum = 0.0
        benchmarkRule.runBenchmark {
        suspend fun doMath2x(n: Int, barrier: CyclicCountDownBarrier): Double {
            onEachIteration(count = 1) { n, barrier ->
                suspend {
            val next = doMath(n)
            val next = doMath(n)
                        suspendCoroutine { continuation: Continuation<Double> ->
            return suspendCoroutine { continuation: Continuation<Double> ->
                            executor.execute {
                scheduler.execute {
                    continuation.resume(doMath(next))
                    continuation.resume(doMath(next))
                    barrier.countDown()
                    barrier.countDown()
                }
                }
            }
            }
        }
        }
        benchmarkRule.runBenchmark {
            withBarrier(count = 1) {
                beforeFirstIteration { barrier -> scheduler.execute { barrier.countDown() } }
                onEachIteration { n, barrier ->
                    suspend { doMath2x(n, barrier) }
                        .startCoroutine(
                        .startCoroutine(
                            Continuation(
                            Continuation(
                                context = EmptyCoroutineContext,
                                context = EmptyCoroutineContext,
@@ -150,14 +164,15 @@ class StartIntrinsicCoroutineBaselineBenchmark(param: ThreadFactory<ExecutorServ
                        )
                        )
                }
                }
            }
            }
        }
        BlackHole.consume(sum)
        BlackHole.consume(sum)
    }
    }
}
}


@RunWith(Parameterized::class)
@RunWith(Parameterized::class)
@FixMethodOrder(MethodSorters.NAME_ASCENDING)
@FixMethodOrder(MethodSorters.NAME_ASCENDING)
class ResumeIntrinsicCoroutineBaselineBenchmark(param: ThreadFactory<ExecutorService, Executor>) :
class ResumeIntrinsicCoroutineBaselineBenchmark(param: ThreadFactory<Any, Executor>) :
    BaseExecutorBenchmark(param) {
    BaseSchedulerBenchmark<Executor>(param) {


    companion object {
    companion object {
        @Parameters(name = "{0}")
        @Parameters(name = "{0}")
@@ -169,19 +184,21 @@ class ResumeIntrinsicCoroutineBaselineBenchmark(param: ThreadFactory<ExecutorSer
    fun benchmark() {
    fun benchmark() {
        var sum = 0.0
        var sum = 0.0
        val nextInput = SimpleSynchronousState<Double>()
        val nextInput = SimpleSynchronousState<Double>()
        benchmarkRule.runBenchmark {
        suspend fun doMathForever(barrier: CyclicCountDownBarrier) {
            beforeFirstIteration(count = 1) { barrier ->
                suspend {
            while (true) {
            while (true) {
                val next = nextInput.awaitValue()
                val next = nextInput.awaitValue()
                sum += suspendCoroutine { continuation: Continuation<Double> ->
                sum += suspendCoroutine { continuation: Continuation<Double> ->
                                executor.execute {
                    scheduler.execute {
                        continuation.resume(doMath(next))
                        continuation.resume(doMath(next))
                        barrier.countDown()
                        barrier.countDown()
                    }
                    }
                }
                }
            }
            }
        }
        }
        benchmarkRule.runBenchmark {
            withBarrier(count = 1) {
                beforeFirstIteration { barrier ->
                    suspend { doMathForever(barrier) }
                        .startCoroutine(
                        .startCoroutine(
                            Continuation(
                            Continuation(
                                context = EmptyCoroutineContext,
                                context = EmptyCoroutineContext,
@@ -192,7 +209,8 @@ class ResumeIntrinsicCoroutineBaselineBenchmark(param: ThreadFactory<ExecutorSer
                        )
                        )
                    nextInput.putValueOrThrow(0.00)
                    nextInput.putValueOrThrow(0.00)
                }
                }
            mainBlock { n -> nextInput.putValueOrThrow(doMath(n)) }
            }
            onEachIteration { n -> nextInput.putValueOrThrow(doMath(n)) }
        }
        }
        BlackHole.consume(sum)
        BlackHole.consume(sum)
    }
    }
@@ -201,14 +219,14 @@ class ResumeIntrinsicCoroutineBaselineBenchmark(param: ThreadFactory<ExecutorSer
@RunWith(Parameterized::class)
@RunWith(Parameterized::class)
@FixMethodOrder(MethodSorters.NAME_ASCENDING)
@FixMethodOrder(MethodSorters.NAME_ASCENDING)
class LaunchCoroutineBaselineBenchmark(param: ThreadFactory<Any, CoroutineScope>) :
class LaunchCoroutineBaselineBenchmark(param: ThreadFactory<Any, CoroutineScope>) :
    BaseCoroutineBenchmark(param) {
    BaseSchedulerBenchmark<CoroutineScope>(param) {


    companion object {
    companion object {
        @Parameters(name = "{0}")
        @Parameters(name = "{0}")
        @JvmStatic
        @JvmStatic
        fun getDispatchers() =
        fun getDispatchers() =
            listOf(
            listOf(
                ExecutorThreadScopeBuilder,
                ExecutorServiceCoroutineScopeBuilder,
                HandlerThreadScopeBuilder,
                HandlerThreadScopeBuilder,
                HandlerThreadImmediateScopeBuilder,
                HandlerThreadImmediateScopeBuilder,
            )
            )
@@ -218,14 +236,17 @@ class LaunchCoroutineBaselineBenchmark(param: ThreadFactory<Any, CoroutineScope>
    fun benchmark() {
    fun benchmark() {
        var sum = 0.0
        var sum = 0.0
        benchmarkRule.runBenchmark {
        benchmarkRule.runBenchmark {
            onEachIteration(count = 1) { n, barrier ->
            withBarrier(count = 1) {
                beforeFirstIteration { barrier -> scheduler.launch { barrier.countDown() } }
                onEachIteration { n, barrier ->
                    val next = doMath(n)
                    val next = doMath(n)
                bgScope.launch {
                    scheduler.launch {
                        sum += doMath(next)
                        sum += doMath(next)
                        barrier.countDown()
                        barrier.countDown()
                    }
                    }
                }
                }
            }
            }
        }
        BlackHole.consume(sum)
        BlackHole.consume(sum)
    }
    }
}
}
@@ -233,14 +254,14 @@ class LaunchCoroutineBaselineBenchmark(param: ThreadFactory<Any, CoroutineScope>
@RunWith(Parameterized::class)
@RunWith(Parameterized::class)
@FixMethodOrder(MethodSorters.NAME_ASCENDING)
@FixMethodOrder(MethodSorters.NAME_ASCENDING)
class MutableStateFlowBaselineBenchmark(param: ThreadFactory<Any, CoroutineScope>) :
class MutableStateFlowBaselineBenchmark(param: ThreadFactory<Any, CoroutineScope>) :
    BaseCoroutineBenchmark(param) {
    BaseSchedulerBenchmark<CoroutineScope>(param) {


    companion object {
    companion object {
        @Parameters(name = "{0}")
        @Parameters(name = "{0}")
        @JvmStatic
        @JvmStatic
        fun getDispatchers() =
        fun getDispatchers() =
            listOf(
            listOf(
                ExecutorThreadScopeBuilder,
                ExecutorServiceCoroutineScopeBuilder,
                HandlerThreadScopeBuilder,
                HandlerThreadScopeBuilder,
                HandlerThreadImmediateScopeBuilder,
                HandlerThreadImmediateScopeBuilder,
            )
            )
@@ -251,15 +272,17 @@ class MutableStateFlowBaselineBenchmark(param: ThreadFactory<Any, CoroutineScope
        var sum = 0.0
        var sum = 0.0
        val state = MutableStateFlow(0.00)
        val state = MutableStateFlow(0.00)
        benchmarkRule.runBenchmark {
        benchmarkRule.runBenchmark {
            beforeFirstIteration(count = 1) { barrier ->
            withBarrier(count = 1) {
                bgScope.launch {
                beforeFirstIteration { barrier ->
                    scheduler.launch {
                        state.collect { next ->
                        state.collect { next ->
                            sum += doMath(next)
                            sum += doMath(next)
                            barrier.countDown()
                            barrier.countDown()
                        }
                        }
                    }
                    }
                }
                }
            mainBlock { n -> state.value = doMath(n) }
            }
            onEachIteration { n -> state.value = doMath(n) }
        }
        }
        BlackHole.consume(sum)
        BlackHole.consume(sum)
    }
    }
@@ -267,8 +290,8 @@ class MutableStateFlowBaselineBenchmark(param: ThreadFactory<Any, CoroutineScope


@RunWith(Parameterized::class)
@RunWith(Parameterized::class)
@FixMethodOrder(MethodSorters.NAME_ASCENDING)
@FixMethodOrder(MethodSorters.NAME_ASCENDING)
class StateHolderBaselineBenchmark(param: ThreadFactory<ExecutorService, Executor>) :
class SimpleObservableStateBaselineBenchmark(param: ThreadFactory<Any, Executor>) :
    BaseExecutorBenchmark(param) {
    BaseSchedulerBenchmark<Executor>(param) {


    companion object {
    companion object {
        @Parameters(name = "{0}") @JvmStatic fun getDispatchers() = listOf(ExecutorThreadBuilder)
        @Parameters(name = "{0}") @JvmStatic fun getDispatchers() = listOf(ExecutorThreadBuilder)
@@ -277,15 +300,19 @@ class StateHolderBaselineBenchmark(param: ThreadFactory<ExecutorService, Executo
    @Test
    @Test
    fun benchmark() {
    fun benchmark() {
        var sum = 0.0
        var sum = 0.0
        val state = SimpleStateHolder(0.0)
        val state = SimpleState(0.0)
        benchmarkRule.runBenchmark {
        benchmarkRule.runBenchmark {
            beforeFirstIteration(count = 1) { barrier ->
            withBarrier(count = 1) {
                state.addListener(executor, notifyInitial = true) { next ->
                beforeFirstIteration { barrier ->
                    state.listen { next ->
                        scheduler.execute {
                            sum += doMath(next)
                            sum += doMath(next)
                            barrier.countDown()
                            barrier.countDown()
                        }
                        }
                    }
                    }
            mainBlock { n -> state.value = doMath(n) }
                }
            }
            onEachIteration { n -> state.value = doMath(n) }
        }
        }
        BlackHole.consume(sum)
        BlackHole.consume(sum)
    }
    }
@@ -293,9 +320,8 @@ class StateHolderBaselineBenchmark(param: ThreadFactory<ExecutorService, Executo


@RunWith(Parameterized::class)
@RunWith(Parameterized::class)
@FixMethodOrder(MethodSorters.NAME_ASCENDING)
@FixMethodOrder(MethodSorters.NAME_ASCENDING)
class StateHolderIntrinsicCoroutineBaselineBenchmark(
class SimplePublisherIntrinsicCoroutineBaselineBenchmark(param: ThreadFactory<Any, Executor>) :
    param: ThreadFactory<ExecutorService, Executor>
    BaseSchedulerBenchmark<Executor>(param) {
) : BaseExecutorBenchmark(param) {


    companion object {
    companion object {
        @Parameters(name = "{0}") @JvmStatic fun getDispatchers() = listOf(ExecutorThreadBuilder)
        @Parameters(name = "{0}") @JvmStatic fun getDispatchers() = listOf(ExecutorThreadBuilder)
@@ -304,10 +330,12 @@ class StateHolderIntrinsicCoroutineBaselineBenchmark(
    @Test
    @Test
    fun benchmark() {
    fun benchmark() {
        var sum = 0.0
        var sum = 0.0
        val state = SimpleStateHolder(0.00)
        val state = SimplePublisherImpl<Double>()
        val stateWatcher = state.asSuspendableObserver(executor)
        val stateWatcher = state.asSuspendableObserver(scheduler)
        benchmarkRule.runBenchmark {
        benchmarkRule.runBenchmark {
            beforeFirstIteration(count = 1) { barrier ->
            withBarrier(count = 1) {
                beforeFirstIteration { barrier ->
                    scheduler.execute { barrier.countDown() }
                    suspend fun collectLambda(): Nothing {
                    suspend fun collectLambda(): Nothing {
                        while (true) {
                        while (true) {
                            val next = stateWatcher.awaitNextValue()
                            val next = stateWatcher.awaitNextValue()
@@ -319,12 +347,66 @@ class StateHolderIntrinsicCoroutineBaselineBenchmark(
                        Continuation(context = EmptyCoroutineContext, resumeWith = {})
                        Continuation(context = EmptyCoroutineContext, resumeWith = {})
                    )
                    )
                }
                }
            mainBlock { n -> state.value = doMath(n) }
            }
            onEachIteration { n -> state.publish(doMath(n)) }
        }
        }
        BlackHole.consume(sum)
        BlackHole.consume(sum)
    }
    }
}
}


private sealed interface GenericBaselineBenchmark<T, E : Any>
    where T : WritableEventFactory<E>, T : EventContextProvider<E> {
    val benchmarkRule: ConcurrentBenchmarkRule
    val context: T

    @Test
    fun benchmark_doMathBg() {
        val state = context.createWritableEvent(0.00)
        var sum = 0.0
        benchmarkRule.runBenchmark {
            withBarrier(count = 1) {
                beforeFirstIteration { barrier ->
                    context.read {
                        state.observe { next ->
                            sum += doMath(next)
                            barrier.countDown()
                        }
                    }
                }
            }
            onEachIteration { n -> context.write { state.update(doMath(n)) } }
        }
    }
}

@RunWith(Parameterized::class)
class SimpleGenericBaselineBenchmark(param: ThreadFactory<Any, Executor>) :
    BaseEventBenchmark<Executor, SimpleWritableEventBuilder>(
        param,
        { SimpleWritableEventBuilder(it) },
    ),
    GenericBaselineBenchmark<SimpleWritableEventBuilder, SimpleEvent<*>> {

    companion object {
        @Parameters(name = "{0}") @JvmStatic fun getDispatchers() = listOf(ExecutorThreadBuilder)
    }
}

@RunWith(Parameterized::class)
class FlowGenericBaselineBenchmark(param: ThreadFactory<Any, CoroutineScope>) :
    BaseEventBenchmark<CoroutineScope, FlowWritableEventBuilder>(
        param,
        { FlowWritableEventBuilder(it) },
    ),
    GenericBaselineBenchmark<FlowWritableEventBuilder, Flow<*>> {

    companion object {
        @Parameters(name = "{0}")
        @JvmStatic
        fun getDispatchers() = listOf(ExecutorServiceCoroutineScopeBuilder)
    }
}

private fun doMath(num: Number): Double {
private fun doMath(num: Number): Double {
    val n = num.toDouble()
    val n = num.toDouble()
    var sum = 0.0
    var sum = 0.0
+133 −0

File added.

Preview size limit exceeded, changes collapsed.

Loading