Donate to e Foundation | Murena handsets with /e/OS | Own a part of Murena! Learn more

Commit e3043f9f authored by Treehugger Robot's avatar Treehugger Robot Committed by Android (Google) Code Review
Browse files

Merge "tracinglib: add delegated types for tracing" into main

parents 828af6a7 d09819de
Loading
Loading
Loading
Loading
+163 −62
Original line number Diff line number Diff line
@@ -14,21 +14,25 @@
 * limitations under the License.
 */

@file:OptIn(ExperimentalTypeInference::class)

package com.android.app.tracing.coroutines.flow

import com.android.app.tracing.coroutines.CoroutineTraceName
import com.android.app.tracing.coroutines.traceCoroutine
import com.android.app.tracing.coroutines.traceName
import com.android.app.tracing.traceSection
import kotlin.experimental.ExperimentalTypeInference
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.ExperimentalCoroutinesApi
import kotlinx.coroutines.ExperimentalForInheritanceCoroutinesApi
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.FlowCollector
import kotlinx.coroutines.flow.MutableSharedFlow
import kotlinx.coroutines.flow.MutableStateFlow
import kotlinx.coroutines.flow.SharedFlow
import kotlinx.coroutines.flow.SharingStarted
import kotlinx.coroutines.flow.StateFlow
import kotlinx.coroutines.flow.asSharedFlow
import kotlinx.coroutines.flow.asStateFlow
import kotlinx.coroutines.flow.collect
import kotlinx.coroutines.flow.collectLatest
import kotlinx.coroutines.flow.filter
@@ -42,14 +46,14 @@ import kotlinx.coroutines.flow.stateIn
import kotlinx.coroutines.flow.transform

/** @see kotlinx.coroutines.flow.internal.unsafeFlow */
@OptIn(ExperimentalTypeInference::class)
@PublishedApi
internal inline fun <T> unsafeFlow(
    name: String,
    crossinline block: suspend FlowCollector<T>.() -> Unit,
    @BuilderInference crossinline block: suspend FlowCollector<T>.() -> Unit
): Flow<T> {
    return object : Flow<T> {
        override suspend fun collect(collector: FlowCollector<T>) {
            traceCoroutine("collect:$name") { collector.block() }
            collector.block()
        }
    }
}
@@ -57,45 +61,141 @@ internal inline fun <T> unsafeFlow(
/** @see kotlinx.coroutines.flow.unsafeTransform */
@PublishedApi
internal inline fun <T, R> Flow<T>.unsafeTransform(
    name: String,
    crossinline transform: suspend FlowCollector<R>.(value: T) -> Unit,
): Flow<R> = unsafeFlow(name) { collect { value -> transform(value) } }
    crossinline transform: suspend FlowCollector<R>.(value: T) -> Unit
): Flow<R> = unsafeFlow { collect { value -> transform(value) } }

@OptIn(ExperimentalForInheritanceCoroutinesApi::class)
private open class TracedSharedFlow<T>(private val name: String, private val flow: SharedFlow<T>) :
    SharedFlow<T> by flow {
    override val replayCache: List<T>
        get() = traceSection("replayCache:$name") { flow.replayCache }

    override suspend fun collect(collector: FlowCollector<T>): Nothing {
        traceCoroutine("collect:$name") {
            flow.collect { traceCoroutine("emit:$name") { collector.emit(it) } }
        }
    }
}

@OptIn(ExperimentalForInheritanceCoroutinesApi::class)
private open class TracedStateFlow<T>(private val name: String, private val flow: StateFlow<T>) :
    StateFlow<T> by flow, TracedSharedFlow<T>(name, flow) {
    override val value: T
        get() = traceCoroutine("get:$name") { flow.value }

    override val replayCache: List<T>
        get() = super.replayCache

    override suspend fun collect(collector: FlowCollector<T>): Nothing {
        super.collect(collector)
    }
}

@OptIn(ExperimentalForInheritanceCoroutinesApi::class)
private class TracedMutableSharedFlow<T>(
    private val name: String,
    private val flow: MutableSharedFlow<T>,
) : MutableSharedFlow<T> by flow, TracedSharedFlow<T>(name, flow) {

    override val replayCache: List<T>
        get() = super.replayCache

    override suspend fun emit(value: T) {
        traceCoroutine("emit:$name") { flow.emit(value) }
    }

    override fun tryEmit(value: T): Boolean {
        return traceCoroutine("tryEmit:$name") { flow.tryEmit(value) }
    }

    override suspend fun collect(collector: FlowCollector<T>): Nothing {
        super.collect(collector)
    }
}

@OptIn(ExperimentalForInheritanceCoroutinesApi::class)
private class TracedMutableStateFlow<T>(
    private val name: String,
    private val flow: MutableStateFlow<T>,
) : MutableStateFlow<T> by flow, TracedStateFlow<T>(name, flow) {

    override var value: T
        get() = super.value
        set(newValue) {
            traceSection("updateState:$name") { flow.value = newValue }
        }

    override val replayCache: List<T>
        get() = super.replayCache

    override suspend fun emit(value: T) {
        traceCoroutine("emit:$name") { flow.emit(value) }
    }

    override suspend fun collect(collector: FlowCollector<T>): Nothing {
        traceCoroutine("collect:$name") {
            flow.collect { traceCoroutine("emit:$name") { collector.emit(it) } }
        }
    }

    override fun compareAndSet(expect: T, update: T): Boolean {
        traceSection("updateState:$name") {
            return flow.compareAndSet(expect, update)
        }
    }
}

/**
 * Helper for adding trace sections for when a trace is collected.
 *
 * For example, the following would `emit(1)` from a trace section named "a" and collect in section
 * named "b".
 * For example, the following would `emit(1)` from a trace section named "my-flow" and collect in a
 * coroutine scope named "my-launch".
 *
 * ```
 *   launch(nameCoroutine("b") {
 *   val flow {
 *     // The open trace section here would be:
 *     // "coroutine execution;my-launch", and "collect:my-flow"
 *     emit(1)
 *   }
 *     .flowName("a")
 *   launchTraced("my-launch") {
 *     .flowName("my-flow")
 *     .collect {
 *       // The open trace sections here would be "collect:a" and "a:emit"
 *       // The open trace sections here would be:
 *       // "coroutine execution;my-launch", "collect:my-flow", and "emit:my-flow"
 *     }
 *   }
 * ```
 *
 * TODO(b/334171711): Rename via @Deprecated("Renamed to .traceAs()", ReplaceWith("traceAs(name)"))
 */
public fun <T> Flow<T>.flowName(name: String): Flow<T> {
public fun <T> Flow<T>.flowName(name: String): Flow<T> = traceAs(name)

public fun <T> Flow<T>.traceAs(name: String): Flow<T> {
    return if (com.android.systemui.Flags.coroutineTracing()) {
        unsafeTransform(name) { traceCoroutine("emit") { emit(it) } }
        unsafeFlow {
            traceCoroutine("collect:$name") {
                collect { value -> traceCoroutine("emit:$name") { emit(value) } }
            }
        }
    } else {
        this
    }
}

public fun <T> SharedFlow<T>.traceAs(name: String): SharedFlow<T> =
    if (com.android.systemui.Flags.coroutineTracing()) TracedSharedFlow(name, this) else this

public fun <T> StateFlow<T>.traceAs(name: String): StateFlow<T> =
    if (com.android.systemui.Flags.coroutineTracing()) TracedStateFlow(name, this) else this

public fun <T> MutableSharedFlow<T>.traceAs(name: String): MutableSharedFlow<T> =
    if (com.android.systemui.Flags.coroutineTracing()) TracedMutableSharedFlow(name, this) else this

public fun <T> MutableStateFlow<T>.traceAs(name: String): MutableStateFlow<T> =
    if (com.android.systemui.Flags.coroutineTracing()) TracedMutableStateFlow(name, this) else this

public fun <T> Flow<T>.onEachTraced(name: String, action: suspend (T) -> Unit): Flow<T> {
    return if (com.android.systemui.Flags.coroutineTracing()) {
        unsafeTransform(name) { value ->
            traceCoroutine("onEach:action") { action(value) }
            traceCoroutine("onEach:emit") { emit(value) }
        }
    } else {
        onEach(action)
    }
    return onEach { value -> traceCoroutine(name) { action(value) } }
}

/**
@@ -116,7 +216,7 @@ public fun <T> Flow<T>.onEachTraced(name: String, action: suspend (T) -> Unit):
 */
public suspend fun <T> Flow<T>.collectTraced(name: String, collector: FlowCollector<T>) {
    if (com.android.systemui.Flags.coroutineTracing()) {
        flowName(name).collect(collector)
        traceAs(name).collect(collector)
    } else {
        collect(collector)
    }
@@ -125,7 +225,7 @@ public suspend fun <T> Flow<T>.collectTraced(name: String, collector: FlowCollec
/** @see kotlinx.coroutines.flow.collect */
public suspend fun <T> Flow<T>.collectTraced(name: String) {
    if (com.android.systemui.Flags.coroutineTracing()) {
        flowName(name).collect()
        traceAs(name).collect()
    } else {
        collect()
    }
@@ -140,20 +240,20 @@ public suspend fun <T> Flow<T>.collectTraced(collector: FlowCollector<T>) {
    }
}

@OptIn(ExperimentalTypeInference::class)
@ExperimentalCoroutinesApi
public fun <T, R> Flow<T>.mapLatestTraced(
    name: String,
    @BuilderInference transform: suspend (value: T) -> R,
): Flow<R> {
    return if (com.android.systemui.Flags.coroutineTracing()) {
        val collectName = "mapLatest:$name"
        val actionName = "$collectName:transform"
        traceCoroutine(collectName) { mapLatest { traceCoroutine(actionName) { transform(it) } } }
        traceAs("mapLatest:$name").mapLatest { traceCoroutine(name) { transform(it) } }
    } else {
        mapLatest(transform)
    }
}

@OptIn(ExperimentalTypeInference::class)
@ExperimentalCoroutinesApi
public fun <T, R> Flow<T>.mapLatestTraced(
    @BuilderInference transform: suspend (value: T) -> R
@@ -171,11 +271,7 @@ internal suspend fun <T> Flow<T>.collectLatestTraced(
    action: suspend (value: T) -> Unit,
) {
    if (com.android.systemui.Flags.coroutineTracing()) {
        val collectName = "collectLatest:$name"
        val actionName = "$collectName:action"
        return traceCoroutine(collectName) {
            collectLatest { traceCoroutine(actionName) { action(it) } }
        }
        return traceAs("collectLatest:$name").collectLatest { traceCoroutine(name) { action(it) } }
    } else {
        collectLatest(action)
    }
@@ -198,7 +294,13 @@ public inline fun <T, R> Flow<T>.transformTraced(
): Flow<R> {
    return if (com.android.systemui.Flags.coroutineTracing()) {
        // Safe flow must be used because collector is exposed to the caller
        safeFlow { collect { value -> traceCoroutine("$name:transform") { transform(value) } } }
        safeFlow {
            collect { value ->
                traceCoroutine(name) {
                    return@collect transform(value)
                }
            }
        }
    } else {
        transform(transform)
    }
@@ -210,9 +312,9 @@ public inline fun <T> Flow<T>.filterTraced(
    crossinline predicate: suspend (T) -> Boolean,
): Flow<T> {
    return if (com.android.systemui.Flags.coroutineTracing()) {
        unsafeTransform(name) { value ->
            if (traceCoroutine("filter:predicate") { predicate(value) }) {
                traceCoroutine("filter:emit") { emit(value) }
        unsafeTransform { value ->
            if (traceCoroutine(name) { predicate(value) }) {
                emit(value)
            }
        }
    } else {
@@ -226,9 +328,9 @@ public inline fun <T, R> Flow<T>.mapTraced(
    crossinline transform: suspend (value: T) -> R,
): Flow<R> {
    return if (com.android.systemui.Flags.coroutineTracing()) {
        unsafeTransform(name) { value ->
            val transformedValue = traceCoroutine("map:transform") { transform(value) }
            traceCoroutine("map:emit") { emit(transformedValue) }
        unsafeTransform { value ->
            val transformedValue = traceCoroutine(name) { transform(value) }
            emit(transformedValue)
        }
    } else {
        map(transform)
@@ -243,13 +345,10 @@ public fun <T> Flow<T>.shareInTraced(
    replay: Int = 0,
): SharedFlow<T> {
    // .shareIn calls this.launch(context), where this === scope, and the previous upstream flow's
    // context is passed to launch
    return if (com.android.systemui.Flags.coroutineTracing()) {
            flowOn(CoroutineTraceName(name))
        } else {
            this
        }
        .shareIn(scope, started, replay)
    // context is passed to launch (caveat: the upstream context is only passed to the downstream
    // SharedFlow if certain conditions are met). For instead, if the upstream is a SharedFlow,
    // the `.flowOn()` operator will have no effect.
    return maybeFuseTraceName(name).shareIn(scope, started, replay).traceAs(name)
}

/** @see kotlinx.coroutines.flow.stateIn */
@@ -261,21 +360,23 @@ public fun <T> Flow<T>.stateInTraced(
): StateFlow<T> {
    // .stateIn calls this.launch(context), where this === scope, and the previous upstream flow's
    // context is passed to launch
    return if (com.android.systemui.Flags.coroutineTracing()) {
            flowOn(CoroutineTraceName(name))
        } else {
            this
        }
        .stateIn(scope, started, initialValue)
    return maybeFuseTraceName(name).stateIn(scope, started, initialValue).traceAs(name)
}

/** @see kotlinx.coroutines.flow.stateIn */
public suspend fun <T> Flow<T>.stateInTraced(name: String, scope: CoroutineScope): StateFlow<T> {
    // .stateIn calls this.launch(context), where this === scope, and the previous upstream flow's
    return if (com.android.systemui.Flags.coroutineTracing()) {
            flowOn(CoroutineTraceName(name))
        } else {
            this
    // context is passed to launch
    return maybeFuseTraceName(name).stateIn(scope).traceAs(name)
}
        .stateIn(scope)

public fun <T> MutableSharedFlow<T>.asSharedFlowTraced(name: String): SharedFlow<T> {
    return asSharedFlow().traceAs(name)
}

public fun <T> MutableStateFlow<T>.asStateFlowTraced(name: String): StateFlow<T> {
    return asStateFlow().traceAs(name)
}

private fun <T> Flow<T>.maybeFuseTraceName(name: String): Flow<T> =
    if (com.android.systemui.Flags.coroutineTracing()) flowOn(CoroutineTraceName(name)) else this
+162 −120
Original line number Diff line number Diff line
@@ -18,9 +18,17 @@ package com.example.tracing.demo.experiments
import android.os.Trace
import com.android.app.tracing.TraceUtils.traceAsync
import com.android.app.tracing.coroutines.createCoroutineTracingContext
import com.android.app.tracing.coroutines.flow.asStateFlowTraced
import com.android.app.tracing.coroutines.flow.filterTraced
import com.android.app.tracing.coroutines.flow.flowName
import com.android.app.tracing.coroutines.flow.mapTraced
import com.android.app.tracing.coroutines.flow.shareInTraced
import com.android.app.tracing.coroutines.flow.stateInTraced
import com.android.app.tracing.coroutines.flow.traceAs
import com.android.app.tracing.coroutines.launchInTraced
import com.android.app.tracing.coroutines.launchTraced
import com.android.app.tracing.coroutines.traceCoroutine
import com.example.tracing.demo.FixedThread1
import com.example.tracing.demo.FixedThread2
import javax.inject.Inject
import javax.inject.Singleton
import kotlin.contracts.ExperimentalContracts
@@ -29,30 +37,40 @@ import kotlin.contracts.contract
import kotlin.coroutines.CoroutineContext
import kotlinx.coroutines.CoroutineDispatcher
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.Job
import kotlinx.coroutines.cancelChildren
import kotlinx.coroutines.coroutineScope
import kotlinx.coroutines.delay
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.MutableStateFlow
import kotlinx.coroutines.flow.SharingStarted
import kotlinx.coroutines.flow.flow
import kotlinx.coroutines.flow.flowOn
import kotlinx.coroutines.job
import kotlinx.coroutines.launch

@Singleton
class FlowTracingTutorial
@Inject
constructor(
    @FixedThread1 private var dispatcherA: CoroutineDispatcher,
    @FixedThread1 private var dispatcherB: CoroutineDispatcher,
    @FixedThread1 private var dispatcher1: CoroutineDispatcher,
    @FixedThread2 private var dispatcher2: CoroutineDispatcher,
) : Experiment() {

    override val description: String = "Flow tracing tutorial"

    private lateinit var scope: CoroutineScope
    private lateinit var bgScope: CoroutineScope

    @OptIn(ExperimentalContracts::class)
    private suspend inline fun runStep(stepNumber: Int = 0, crossinline block: (Job) -> Unit) {
    private suspend inline fun runStep(stepName: String, crossinline block: () -> Unit) {
        contract { callsInPlace(block, InvocationKind.EXACTLY_ONCE) }
        traceAsync(TRACK_NAME, "Step #$stepNumber") {
            coroutineScope { block(coroutineContext.job) }
        traceAsync(TRACK_NAME, "Step #$stepName") {
            block()
            traceAsync(TRACK_NAME, "running") { forceSuspend(timeMillis = 40) }
            traceAsync(TRACK_NAME, "cleanup") {
                traceAsync(TRACK_NAME, "cancel-main") { scope.coroutineContext.cancelChildren() }
                traceAsync(TRACK_NAME, "cancel-bg") { bgScope.coroutineContext.cancelChildren() }
                forceSuspend(timeMillis = 10)
            }
        }
        traceAsync(TRACK_NAME, "cooldown") { delay(10) }
    }

    private fun createTracingContext(name: String): CoroutineContext {
@@ -63,122 +81,146 @@ constructor(
        )
    }

    /** 1: */
    private fun step1(job: Job) {
        val scope = CoroutineScope(job + dispatcherA + createTracingContext("scope1"))
        val coldFlow = flow {
            forceSuspend("e1", 2)
            traceCoroutine("emit(1)") { emit(0) }
            forceSuspend("e3", 2)
            traceCoroutine("emit(2)") { emit(1) }
            forceSuspend("e5", 2)
        }
        scope.launchTraced("launch1") {
            coldFlow.collect {
                Trace.instant(Trace.TRACE_TAG_APP, "received:$it")
                when (it) {
                    0 -> {
                        forceSuspend("e2", 1)
                    }
                    1 -> {
                        forceSuspend("e4", 1)
                    }
                }
            }
        }
    }

    //    /** 2: */
    //    private fun step2(job: Job) {
    //        val scope = CoroutineScope(job + dispatcherA + createTracingContext("scope2"))
    //        scope.launchTraced("launch2") { coldFlow.collectTraced { forceSuspend("got:$it", 1) }
    // }
    //    }
    //
    //    /** 3: */
    //    private fun step3(job: Job) {
    //        val scope = CoroutineScope(job + dispatcherA + createTracingContext("scope3"))
    //        scope.launchTraced("launch3") {
    //            coldFlow.collectTraced("collect3") { forceSuspend("got:$it", 1) }
    //        }
    //    }
    //
    //    /** 4: */
    //    private fun step4(job: Job) {
    //        val scope = CoroutineScope(job + dispatcherA + createTracingContext("scope3"))
    //        scope.launchTraced("launch3") {
    //            coldFlow.map { it * 2 }.collectTraced("collect4") { forceSuspend("got:$it", 1) }
    //        }
    //    }

    //    /** 5: */
    //    private fun step5(job: Job) {
    //        scope.launchTraced("my-launch") {
    //            coldFlow
    //                .flowOn(dispatcherA)
    //                .map { it * 2 }
    //                .collectTraced("my-collector") { forceSuspend("A:$it", 1) }
    //        }
    //    }
    //
    //    /** 6: */
    //    private fun step6(job: Job) {
    //        scope.launchTraced("my-launch") {
    //            coldFlow
    //                // Alternatively, call `.flowName("hello-cold")` before or after `flowOn`
    // changes
    //                // the dispatcher.
    //                .flowOn(CoroutineTraceName("hello-cold") + dispatcherA)
    //                .map { it * 2 }
    //                .collectTraced("my-collector") { forceSuspend("A:$it", 1) }
    //        }
    //    }
    //
    //    /** 7: */
    //    private fun step7(job: Job) {
    //        // Important: flowName() must be called BEFORE shareIn(). Otherwise it will have no
    // effect.
    //        val sharedFlow = coldFlow.flowName("my-shared-flow").shareIn(bgScope,
    // SharingStarted.Lazily)
    //
    //        scope.launchTraced("my-launch") {
    //            sharedFlow.collectTraced("my-collector") { forceSuspend("A:$it", 1) }
    //        }
    //
    //        bgScope.cancel()
    //    }
    //
    //    /** 8: */
    //    private fun step8(job: Job) {
    //        val state = MutableStateFlow(1)
    //        // `shareIn` launches on the given scope using the context of the flow as a receiver,
    // but
    //        // only if the Flow is a ChannelFlow. MutableStateFlow is not, so it uses a
    //        // EmptyCoroutineContext.
    //        //
    //        // To get the name of the shared flow into the trace context, we will need to walk the
    // stack
    //        // for a name, or modify behavior of `TraceContextElement` to better support this
    // use-case.
    //        val sharedFlow = state.shareIn(bgScope, SharingStarted.Lazily)
    //
    //        scope.launchTraced("my-launch") {
    //            sharedFlow.collectTraced("my-collector") { forceSuspend("A:$it", 1) }
    //        }
    //
    //        bgScope.cancel()
    //    }
    /** 1.1: */
    private fun step1p1() {
        scope.launchTraced("LAUNCH_FOR_COLLECT_1.1") {
            fibFlow.collect { Trace.instant(Trace.TRACE_TAG_APP, "got:$it") }
        }
    }

    /** 1.2: */
    private fun step1p2() {
        fibFlow.launchInTraced("LAUNCH_FOR_COLLECT_1.2", scope)
    }

    /** 2.1: */
    private fun step2p1() {
        val coldFlow = fibFlow.flowName("FIB_FLOW_NAME_2.1")
        coldFlow.launchInTracedForDemo("LAUNCH_NAME_2.1", scope)
    }

    /** 2.2: */
    private fun step2p2() {
        val coldFlow = fibFlow.flowName("FIB_FLOW_NAME_2.2").flowOn(dispatcher2)
        coldFlow.launchInTracedForDemo("LAUNCH_NAME_2.2", scope)
    }

    /** 2.3: */
    private fun step2p3() {
        val coldFlow = fibFlow.flowOn(dispatcher2).flowName("FIB_FLOW_NAME_2.3")
        coldFlow.launchInTracedForDemo("LAUNCH_NAME_2.3", scope)
    }

    /** 2.4: */
    private fun step2p4() {
        val coldFlow = fibFlow.flowName("FIB_AAA").flowOn(dispatcher2).flowName("FIB_BBB")
        coldFlow.launchInTracedForDemo("LAUNCH_NAME_2.4", scope)
    }

    /** 3: */
    private fun step3() {
        val coldFlow =
            fibFlow
                .mapTraced("x2") { it * 2 }
                .filterTraced("%3==0") { it % 3 == 0 }
                .flowName("(fib x 2) % 3 == 0")
        coldFlow.launchInTracedForDemo("LAUNCH_NAME_3", scope)
    }

    /** 4: */
    private fun step4() {
        val sharedFlow =
            fibFlow.shareInTraced("SHARED_FLOW_NAME_4", bgScope, SharingStarted.Eagerly, 3)
        scope.launchTraced("LAUNCH_NAME_4") {
            forceSuspend("before-collect", 5)
            sharedFlow.collect(::traceInstant)
        }
    }

    /** 5.1: */
    private fun step5p1() {
        val sharedFlow =
            fibFlow.stateInTraced("STATE_FLOW_NAME_5.1", bgScope, SharingStarted.Eagerly, 3)
        scope.launchTraced("LAUNCH_NAME_5.1") {
            forceSuspend("before-collect", 5)
            sharedFlow.collect(::traceInstant)
        }
    }

    /** 5.2: */
    private fun step5p2() {
        val sharedFlow =
            fibFlow.shareInTraced("STATE_FLOW_NAME_5.2", bgScope, SharingStarted.Eagerly, 3)
        val stateFlow = sharedFlow.stateInTraced("", bgScope, SharingStarted.Eagerly, 2)
        scope.launchTraced("LAUNCH_NAME_5.2") {
            forceSuspend("before-collect", 5)
            stateFlow.collect(::traceInstant)
        }
    }

    /** 6.1: */
    private fun step6p1() {
        val state = MutableStateFlow(1).traceAs("MUTABLE_STATE_FLOW_6.1")
        state.launchInTraced("LAUNCH_FOR_STATE_FLOW_COLLECT_6.1", scope)
        bgScope.launchTraced("FWD_FIB_TO_STATE_6.1") {
            forceSuspend("before-collect", 5)
            fibFlow.collect {
                traceInstant(it)
                // Manually forward values from the cold flow to the MutableStateFlow
                state.value = it
            }
        }
    }

    /** 6.2: */
    private fun step6p2() {
        val state = MutableStateFlow(1).traceAs("MUTABLE_STATE_FLOW_6.2")
        val readOnlyState = state.asStateFlowTraced("READ_ONLY_STATE_6.2")
        readOnlyState.launchInTraced("LAUNCH_FOR_STATE_FLOW_COLLECT_6.2", scope)
        bgScope.launchTraced("FWD_FIB_TO_STATE_6.2") {
            fibFlow.collect {
                traceInstant(it)
                // Manually forward values from the cold flow to the MutableStateFlow
                state.value = it
            }
        }
    }

    override suspend fun runExperiment(): Unit = coroutineScope {
        launch {
            runStep(1, ::step1)
            //            runStep(2, ::step2)
            //            runStep(3, ::step3)
        }
        //        runStep(4, ::step4)
        //        runStep(5, ::step5)
        //        runStep(6, ::step6)
        //        runStep(7, ::step7)
        //        runStep(8, ::step8)
        val job = coroutineContext.job
        scope = CoroutineScope(job + dispatcher1 + createTracingContext("main-scope"))
        bgScope = CoroutineScope(job + dispatcher2 + createTracingContext("bg-scope"))
        runStep("1.1", ::step1p1)
        runStep("1.2", ::step1p2)
        runStep("2.1", ::step2p1)
        runStep("2.2", ::step2p2)
        runStep("2.3", ::step2p3)
        runStep("2.4", ::step2p4)
        runStep("3", ::step3)
        runStep("4", ::step4)
        runStep("5.1", ::step5p1)
        runStep("5.2", ::step5p2)
        runStep("6.1", ::step6p1)
        runStep("6.2", ::step6p2)
    }
}

private fun <T> Flow<T>.launchInTracedForDemo(name: String, scope: CoroutineScope) {
    scope.launchTraced(name) { collect(::traceInstant) }
}

private fun <T> traceInstant(value: T) {
    Trace.instant(Trace.TRACE_TAG_APP, "got:$value")
}

private val fibFlow = flow {
    var n0 = 0
    var n1 = 1
    while (true) {
        emit(n0)
        val n2 = n0 + n1
        n0 = n1
        n1 = n2
        forceSuspend("after-emit", 1)
    }
}
+8 −3

File changed.

Preview size limit exceeded, changes collapsed.

+47 −71

File changed.

Preview size limit exceeded, changes collapsed.

+208 −84

File changed.

Preview size limit exceeded, changes collapsed.