Donate to e Foundation | Murena handsets with /e/OS | Own a part of Murena! Learn more

Commit a1debf09 authored by Steve Elliott's avatar Steve Elliott
Browse files

[kairos] keep effect coroutines running for duration of build scope

Some API adjustments to ensure that, even if an Events ends, any effects launched by observers continue in the expected build scope.

Flag: EXEMPT unused
Test: atest kairos-tests
Change-Id: I363cd841a887d8e633a3c0f290305b1a76d7b944
parent bafc44ea
Loading
Loading
Loading
Loading
+10 −9
Original line number Diff line number Diff line
@@ -24,6 +24,7 @@ import kotlin.coroutines.EmptyCoroutineContext
import kotlinx.coroutines.CompletableDeferred
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.Deferred
import kotlinx.coroutines.DisposableHandle
import kotlinx.coroutines.Job
import kotlinx.coroutines.awaitCancellation
import kotlinx.coroutines.coroutineScope
@@ -117,7 +118,7 @@ interface BuildScope : StateScope {
    fun <A> Events<A>.observe(
        coroutineContext: CoroutineContext = EmptyCoroutineContext,
        block: EffectScope.(A) -> Unit = {},
    ): Job
    ): DisposableHandle

    /**
     * Returns an [Events] containing the results of applying each [BuildSpec] emitted from the
@@ -212,7 +213,7 @@ interface BuildScope : StateScope {
     *
     * @see observe
     */
    fun <A> Events<A>.observeBuild(block: BuildScope.(A) -> Unit = {}): Job =
    fun <A> Events<A>.observeBuild(block: BuildScope.(A) -> Unit = {}): DisposableHandle =
        mapBuild(block).observe()

    /**
@@ -568,7 +569,7 @@ interface BuildScope : StateScope {
    /** Returns a [Deferred] containing the next value to be emitted from this [Events]. */
    fun <R> Events<R>.nextDeferred(): Deferred<R> {
        lateinit var next: CompletableDeferred<R>
        val job = nextOnly().observe { next.complete(it) }
        val job = launchScope { nextOnly().observe { next.complete(it) } }
        next = CompletableDeferred<R>(parent = job)
        return next
    }
@@ -618,7 +619,7 @@ interface BuildScope : StateScope {
     * registered [observers][observe] are unregistered, and any pending [side-effects][effect] are
     * cancelled).
     */
    fun <A> Events<A>.observeLatestBuild(block: BuildScope.(A) -> Unit = {}): Job =
    fun <A> Events<A>.observeLatestBuild(block: BuildScope.(A) -> Unit = {}): DisposableHandle =
        mapLatestBuild { block(it) }.observe()

    /**
@@ -627,7 +628,7 @@ interface BuildScope : StateScope {
     *
     * With each invocation of [block], running effects from the previous invocation are cancelled.
     */
    fun <A> Events<A>.observeLatest(block: EffectScope.(A) -> Unit = {}): Job {
    fun <A> Events<A>.observeLatest(block: EffectScope.(A) -> Unit = {}): DisposableHandle {
        var innerJob: Job? = null
        return observeBuild {
            innerJob?.cancel()
@@ -696,7 +697,7 @@ interface BuildScope : StateScope {
     * emitting) then [block] will be invoked for the first time with the new value; otherwise, it
     * will be invoked with the [current][sample] value.
     */
    fun <A> State<A>.observe(block: EffectScope.(A) -> Unit = {}): Job =
    fun <A> State<A>.observe(block: EffectScope.(A) -> Unit = {}): DisposableHandle =
        now.map { sample() }.mergeWith(changes) { _, new -> new }.observe { block(it) }
}

@@ -731,14 +732,14 @@ fun <A> BuildScope.asyncEvent(block: suspend () -> A): Events<A> =
 *
 * Shorthand for:
 * ```kotlin
 *   now.observe { block() }
 *   launchScope { now.observe { block() } }
 * ```
 */
@ExperimentalKairosApi
fun BuildScope.effect(
    context: CoroutineContext = EmptyCoroutineContext,
    block: EffectScope.() -> Unit,
): Job = now.observe(context) { block() }
): Job = launchScope { now.observe(context) { block() } }

/**
 * Launches [block] in a new coroutine, returning a [Job] bound to the coroutine.
@@ -773,7 +774,7 @@ fun BuildScope.launchEffect(block: suspend CoroutineScope.() -> Unit): Job = asy
@ExperimentalKairosApi
fun <R> BuildScope.asyncEffect(block: suspend CoroutineScope.() -> R): Deferred<R> {
    val result = CompletableDeferred<R>()
    val job = now.observe { effectCoroutineScope.launch { result.complete(coroutineScope(block)) } }
    val job = effect { effectCoroutineScope.launch { result.complete(coroutineScope(block)) } }
    val handle = job.invokeOnCompletion { result.cancel() }
    result.invokeOnCompletion {
        handle.dispose()
+27 −21
Original line number Diff line number Diff line
@@ -45,6 +45,7 @@ import java.util.concurrent.atomic.AtomicReference
import kotlin.coroutines.CoroutineContext
import kotlinx.coroutines.CompletableJob
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.DisposableHandle
import kotlinx.coroutines.Job
import kotlinx.coroutines.cancel
import kotlinx.coroutines.job
@@ -117,12 +118,13 @@ internal class BuildScopeImpl(val stateScope: StateScopeImpl, val coroutineScope
    override fun <A> Events<A>.observe(
        coroutineContext: CoroutineContext,
        block: EffectScope.(A) -> Unit,
    ): Job {
    ): DisposableHandle {
        val subRef = AtomicReference<Maybe<Output<A>>>(null)
        val childScope = coroutineScope.childScope()
        // When our scope is cancelled, deactivate this observer.
        childScope.coroutineContext.job.invokeOnCompletion {
        lateinit var cancelHandle: DisposableHandle
        val handle = DisposableHandle {
            subRef.getAndSet(None)?.let { output ->
                cancelHandle.dispose()
                if (output is Just) {
                    @Suppress("DeferredResultUnused")
                    network.transaction("observeEffect cancelled") {
@@ -131,25 +133,29 @@ internal class BuildScopeImpl(val stateScope: StateScopeImpl, val coroutineScope
                }
            }
        }
        // When our scope is cancelled, deactivate this observer.
        cancelHandle = childScope.coroutineContext.job.invokeOnCompletion { handle.dispose() }
        val localNetwork = LocalNetwork(network, childScope, endSignal)
        // Defer so that we don't suspend the caller
        deferAction {
        val outputNode =
            Output<A>(
                context = coroutineContext,
                    onDeath = { subRef.getAndSet(None)?.let { childScope.cancel() } },
                onDeath = { subRef.set(None) },
                onEmit = { output ->
                    if (subRef.get() is Just) {
                        // Not cancelled, safe to emit
                        val scope =
                                object : EffectScope, TransactionScope by this@BuildScopeImpl {
                            object : EffectScope, TransactionScope by this {
                                override val effectCoroutineScope: CoroutineScope = childScope
                                override val kairosNetwork: KairosNetwork = localNetwork
                            }
                            block(scope, output)
                        scope.block(output)
                    }
                },
            )
        // Defer, in case any EventsLoops / StateLoops still need to be set
        deferAction {
            // Check for immediate cancellation
            if (subRef.get() != null) return@deferAction
            this@observe.takeUntil(endSignal)
                .init
                .connect(evalScope = stateScope.evalScope)
@@ -164,7 +170,7 @@ internal class BuildScopeImpl(val stateScope: StateScopeImpl, val coroutineScope
                    }
                } ?: run { childScope.cancel() }
        }
        return childScope.coroutineContext.job
        return handle
    }

    override fun <A, B> Events<A>.mapBuild(transform: BuildScope.(A) -> B): Events<B> {
+71 −1
Original line number Diff line number Diff line
@@ -1083,7 +1083,7 @@ class KairosTests {
    }

    @Test
    fun inpueventsCompleted() = runFrpTest { network ->
    fun inputEventsCompleted() = runFrpTest { network ->
        val results = mutableListOf<Int>()
        val e = network.mutableEvents<Int>()
        activateSpec(network) { e.nextOnly().observe { results.add(it) } }
@@ -1376,6 +1376,76 @@ class KairosTests {
        assertEquals(1, count)
    }

    @Test
    fun observeEffect_disposeHandle() = runFrpTest { network ->
        val input = network.mutableEvents<Unit>()
        val stopper = network.mutableEvents<Unit>()
        var runningCount = 0
        val specJob =
            activateSpec(network) {
                val handle =
                    input.observe {
                        effectCoroutineScope.launch {
                            runningCount++
                            awaitClose { runningCount-- }
                        }
                    }
                stopper.nextOnly().observe { handle.dispose() }
            }
        runCurrent()
        assertEquals(0, runningCount)

        input.emit(Unit)
        assertEquals(1, runningCount)

        input.emit(Unit)
        assertEquals(2, runningCount)

        stopper.emit(Unit)
        assertEquals(2, runningCount)

        input.emit(Unit)
        assertEquals(2, runningCount)

        specJob.cancel()
        runCurrent()
        assertEquals(0, runningCount)
    }

    @Test
    fun observeEffect_takeUntil() = runFrpTest { network ->
        val input = network.mutableEvents<Unit>()
        val stopper = network.mutableEvents<Unit>()
        var runningCount = 0
        val specJob =
            activateSpec(network) {
                input.takeUntil(stopper).observe {
                    effectCoroutineScope.launch {
                        runningCount++
                        awaitClose { runningCount-- }
                    }
                }
            }
        runCurrent()
        assertEquals(0, runningCount)

        input.emit(Unit)
        assertEquals(1, runningCount)

        input.emit(Unit)
        assertEquals(2, runningCount)

        stopper.emit(Unit)
        assertEquals(2, runningCount)

        input.emit(Unit)
        assertEquals(2, runningCount)

        specJob.cancel()
        runCurrent()
        assertEquals(0, runningCount)
    }

    private fun runFrpTest(
        timeout: Duration = 3.seconds,
        block: suspend TestScope.(KairosNetwork) -> Unit,