Donate to e Foundation | Murena handsets with /e/OS | Own a part of Murena! Learn more

Commit 90de8595 authored by Treehugger Robot's avatar Treehugger Robot Committed by Android (Google) Code Review
Browse files

Merge "[kairos] keep effect coroutines running for duration of build scope" into main

parents c86a8c26 a1debf09
Loading
Loading
Loading
Loading
+10 −9
Original line number Diff line number Diff line
@@ -24,6 +24,7 @@ import kotlin.coroutines.EmptyCoroutineContext
import kotlinx.coroutines.CompletableDeferred
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.Deferred
import kotlinx.coroutines.DisposableHandle
import kotlinx.coroutines.Job
import kotlinx.coroutines.awaitCancellation
import kotlinx.coroutines.coroutineScope
@@ -117,7 +118,7 @@ interface BuildScope : StateScope {
    fun <A> Events<A>.observe(
        coroutineContext: CoroutineContext = EmptyCoroutineContext,
        block: EffectScope.(A) -> Unit = {},
    ): Job
    ): DisposableHandle

    /**
     * Returns an [Events] containing the results of applying each [BuildSpec] emitted from the
@@ -212,7 +213,7 @@ interface BuildScope : StateScope {
     *
     * @see observe
     */
    fun <A> Events<A>.observeBuild(block: BuildScope.(A) -> Unit = {}): Job =
    fun <A> Events<A>.observeBuild(block: BuildScope.(A) -> Unit = {}): DisposableHandle =
        mapBuild(block).observe()

    /**
@@ -568,7 +569,7 @@ interface BuildScope : StateScope {
    /** Returns a [Deferred] containing the next value to be emitted from this [Events]. */
    fun <R> Events<R>.nextDeferred(): Deferred<R> {
        lateinit var next: CompletableDeferred<R>
        val job = nextOnly().observe { next.complete(it) }
        val job = launchScope { nextOnly().observe { next.complete(it) } }
        next = CompletableDeferred<R>(parent = job)
        return next
    }
@@ -618,7 +619,7 @@ interface BuildScope : StateScope {
     * registered [observers][observe] are unregistered, and any pending [side-effects][effect] are
     * cancelled).
     */
    fun <A> Events<A>.observeLatestBuild(block: BuildScope.(A) -> Unit = {}): Job =
    fun <A> Events<A>.observeLatestBuild(block: BuildScope.(A) -> Unit = {}): DisposableHandle =
        mapLatestBuild { block(it) }.observe()

    /**
@@ -627,7 +628,7 @@ interface BuildScope : StateScope {
     *
     * With each invocation of [block], running effects from the previous invocation are cancelled.
     */
    fun <A> Events<A>.observeLatest(block: EffectScope.(A) -> Unit = {}): Job {
    fun <A> Events<A>.observeLatest(block: EffectScope.(A) -> Unit = {}): DisposableHandle {
        var innerJob: Job? = null
        return observeBuild {
            innerJob?.cancel()
@@ -696,7 +697,7 @@ interface BuildScope : StateScope {
     * emitting) then [block] will be invoked for the first time with the new value; otherwise, it
     * will be invoked with the [current][sample] value.
     */
    fun <A> State<A>.observe(block: EffectScope.(A) -> Unit = {}): Job =
    fun <A> State<A>.observe(block: EffectScope.(A) -> Unit = {}): DisposableHandle =
        now.map { sample() }.mergeWith(changes) { _, new -> new }.observe { block(it) }
}

@@ -731,14 +732,14 @@ fun <A> BuildScope.asyncEvent(block: suspend () -> A): Events<A> =
 *
 * Shorthand for:
 * ```kotlin
 *   now.observe { block() }
 *   launchScope { now.observe { block() } }
 * ```
 */
@ExperimentalKairosApi
fun BuildScope.effect(
    context: CoroutineContext = EmptyCoroutineContext,
    block: EffectScope.() -> Unit,
): Job = now.observe(context) { block() }
): Job = launchScope { now.observe(context) { block() } }

/**
 * Launches [block] in a new coroutine, returning a [Job] bound to the coroutine.
@@ -773,7 +774,7 @@ fun BuildScope.launchEffect(block: suspend CoroutineScope.() -> Unit): Job = asy
@ExperimentalKairosApi
fun <R> BuildScope.asyncEffect(block: suspend CoroutineScope.() -> R): Deferred<R> {
    val result = CompletableDeferred<R>()
    val job = now.observe { effectCoroutineScope.launch { result.complete(coroutineScope(block)) } }
    val job = effect { effectCoroutineScope.launch { result.complete(coroutineScope(block)) } }
    val handle = job.invokeOnCompletion { result.cancel() }
    result.invokeOnCompletion {
        handle.dispose()
+27 −21
Original line number Diff line number Diff line
@@ -45,6 +45,7 @@ import java.util.concurrent.atomic.AtomicReference
import kotlin.coroutines.CoroutineContext
import kotlinx.coroutines.CompletableJob
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.DisposableHandle
import kotlinx.coroutines.Job
import kotlinx.coroutines.cancel
import kotlinx.coroutines.job
@@ -117,12 +118,13 @@ internal class BuildScopeImpl(val stateScope: StateScopeImpl, val coroutineScope
    override fun <A> Events<A>.observe(
        coroutineContext: CoroutineContext,
        block: EffectScope.(A) -> Unit,
    ): Job {
    ): DisposableHandle {
        val subRef = AtomicReference<Maybe<Output<A>>>(null)
        val childScope = coroutineScope.childScope()
        // When our scope is cancelled, deactivate this observer.
        childScope.coroutineContext.job.invokeOnCompletion {
        lateinit var cancelHandle: DisposableHandle
        val handle = DisposableHandle {
            subRef.getAndSet(None)?.let { output ->
                cancelHandle.dispose()
                if (output is Just) {
                    @Suppress("DeferredResultUnused")
                    network.transaction("observeEffect cancelled") {
@@ -131,25 +133,29 @@ internal class BuildScopeImpl(val stateScope: StateScopeImpl, val coroutineScope
                }
            }
        }
        // When our scope is cancelled, deactivate this observer.
        cancelHandle = childScope.coroutineContext.job.invokeOnCompletion { handle.dispose() }
        val localNetwork = LocalNetwork(network, childScope, endSignal)
        // Defer so that we don't suspend the caller
        deferAction {
        val outputNode =
            Output<A>(
                context = coroutineContext,
                    onDeath = { subRef.getAndSet(None)?.let { childScope.cancel() } },
                onDeath = { subRef.set(None) },
                onEmit = { output ->
                    if (subRef.get() is Just) {
                        // Not cancelled, safe to emit
                        val scope =
                                object : EffectScope, TransactionScope by this@BuildScopeImpl {
                            object : EffectScope, TransactionScope by this {
                                override val effectCoroutineScope: CoroutineScope = childScope
                                override val kairosNetwork: KairosNetwork = localNetwork
                            }
                            block(scope, output)
                        scope.block(output)
                    }
                },
            )
        // Defer, in case any EventsLoops / StateLoops still need to be set
        deferAction {
            // Check for immediate cancellation
            if (subRef.get() != null) return@deferAction
            this@observe.takeUntil(endSignal)
                .init
                .connect(evalScope = stateScope.evalScope)
@@ -164,7 +170,7 @@ internal class BuildScopeImpl(val stateScope: StateScopeImpl, val coroutineScope
                    }
                } ?: run { childScope.cancel() }
        }
        return childScope.coroutineContext.job
        return handle
    }

    override fun <A, B> Events<A>.mapBuild(transform: BuildScope.(A) -> B): Events<B> {
+71 −1
Original line number Diff line number Diff line
@@ -1083,7 +1083,7 @@ class KairosTests {
    }

    @Test
    fun inpueventsCompleted() = runFrpTest { network ->
    fun inputEventsCompleted() = runFrpTest { network ->
        val results = mutableListOf<Int>()
        val e = network.mutableEvents<Int>()
        activateSpec(network) { e.nextOnly().observe { results.add(it) } }
@@ -1376,6 +1376,76 @@ class KairosTests {
        assertEquals(1, count)
    }

    @Test
    fun observeEffect_disposeHandle() = runFrpTest { network ->
        val input = network.mutableEvents<Unit>()
        val stopper = network.mutableEvents<Unit>()
        var runningCount = 0
        val specJob =
            activateSpec(network) {
                val handle =
                    input.observe {
                        effectCoroutineScope.launch {
                            runningCount++
                            awaitClose { runningCount-- }
                        }
                    }
                stopper.nextOnly().observe { handle.dispose() }
            }
        runCurrent()
        assertEquals(0, runningCount)

        input.emit(Unit)
        assertEquals(1, runningCount)

        input.emit(Unit)
        assertEquals(2, runningCount)

        stopper.emit(Unit)
        assertEquals(2, runningCount)

        input.emit(Unit)
        assertEquals(2, runningCount)

        specJob.cancel()
        runCurrent()
        assertEquals(0, runningCount)
    }

    @Test
    fun observeEffect_takeUntil() = runFrpTest { network ->
        val input = network.mutableEvents<Unit>()
        val stopper = network.mutableEvents<Unit>()
        var runningCount = 0
        val specJob =
            activateSpec(network) {
                input.takeUntil(stopper).observe {
                    effectCoroutineScope.launch {
                        runningCount++
                        awaitClose { runningCount-- }
                    }
                }
            }
        runCurrent()
        assertEquals(0, runningCount)

        input.emit(Unit)
        assertEquals(1, runningCount)

        input.emit(Unit)
        assertEquals(2, runningCount)

        stopper.emit(Unit)
        assertEquals(2, runningCount)

        input.emit(Unit)
        assertEquals(2, runningCount)

        specJob.cancel()
        runCurrent()
        assertEquals(0, runningCount)
    }

    private fun runFrpTest(
        timeout: Duration = 3.seconds,
        block: suspend TestScope.(KairosNetwork) -> Unit,