Donate to e Foundation | Murena handsets with /e/OS | Own a part of Murena! Learn more

Commit 2570a8df authored by Steve Elliott's avatar Steve Elliott
Browse files

[kairos] fix early termination of state accumulation

Calling CompletableJob.complete() puts the Job in the *completing*
state, where it stays until all child Jobs complete. If there are no
child Jobs, then it completes immedidately.

We want to do this because it allows the structured concurrency tree to
be cleaned up once it is marked as complete. No doing so means that a
build scope is *running* until it is explicitly cancelled. If child Jobs
complete normally, the parent Job stays *running*, wasting some memory.

However, because we also use the Job as a bridge between the pure Kairos
world, and the impure external world. Specifically, when the Job
completes (or is cancelled), we use that signal to terminate any ongoing
state accumulation. This is fine if it is the semantic end of the scope,
but if the scope is still "semantically" valid, but just not running any
more coroutines (via effects), then we still want state accumulation to
continue, even if we want to mark the Job as complete so that the
coroutines infra can free up some memory.

For now, lets just disable the optimization, as the cost probably isn't
too high.

Flag: EXEMPT unused
Test: atest kairos-tests
Change-Id: I9dd3575451cb146237ce74c12498e2c764576a1e
parent 2b08d466
Loading
Loading
Loading
Loading
+10 −9
Original line number Diff line number Diff line
@@ -36,6 +36,7 @@ import com.android.systemui.kairos.init
import com.android.systemui.kairos.internal.util.childScope
import com.android.systemui.kairos.internal.util.mapValuesParallel
import com.android.systemui.kairos.launchEffect
import com.android.systemui.kairos.mergeLeft
import com.android.systemui.kairos.util.Just
import com.android.systemui.kairos.util.Maybe
import com.android.systemui.kairos.util.None
@@ -85,7 +86,6 @@ internal class BuildScopeImpl(val stateScope: StateScopeImpl, val coroutineScope
    ): TFlow<A> {
        var job: Job? = null
        val stopEmitter = newStopEmitter()
        val handle = this.job.invokeOnCompletion { stopEmitter.emit(Unit) }
        // Create a child scope that will be kept alive beyond the end of this transaction.
        val childScope = coroutineScope.childScope()
        lateinit var emitter: Pair<T, S>
@@ -97,7 +97,6 @@ internal class BuildScopeImpl(val stateScope: StateScopeImpl, val coroutineScope
                        reenterBuildScope(this@BuildScopeImpl, childScope).runInBuildScope {
                            launchEffect {
                                builder(emitter.second)
                                handle.dispose()
                                stopEmitter.emit(Unit)
                            }
                        }
@@ -108,7 +107,7 @@ internal class BuildScopeImpl(val stateScope: StateScopeImpl, val coroutineScope
                },
            )
        emitter = constructFlow(inputNode)
        return with(frpScope) { emitter.first.takeUntil(stopEmitter) }
        return with(frpScope) { emitter.first.takeUntil(mergeLeft(stopEmitter, endSignal)) }
    }

    private fun <T> tFlowInternal(builder: suspend FrpProducerScope<T>.() -> Unit): TFlow<T> =
@@ -299,12 +298,14 @@ internal class BuildScopeImpl(val stateScope: StateScopeImpl, val coroutineScope
        childScope.coroutineContext.job.invokeOnCompletion { stopEmitter.emit(Unit) }
        // Ensure that once this transaction is done, the new child scope enters the completing
        // state (kept alive so long as there are child jobs).
        scheduleOutput(
            OneShot {
                // TODO: don't like this cast
                (childScope.coroutineContext.job as CompletableJob).complete()
            }
        )
        // TODO: need to keep the scope alive if it's used to accumulate state.
        //  Otherwise, stopEmitter will emit early, due to the call to complete().
        //        scheduleOutput(
        //            OneShot {
        //                // TODO: don't like this cast
        //                (childScope.coroutineContext.job as CompletableJob).complete()
        //            }
        //        )
        return BuildScopeImpl(
            stateScope = StateScopeImpl(evalScope = stateScope.evalScope, endSignal = stopEmitter),
            coroutineScope = childScope,
+20 −0
Original line number Diff line number Diff line
@@ -1299,6 +1299,26 @@ class KairosTests {
        assertEquals(1, lastFlows.value.second.value)
    }

    @Test
    fun buildScope_stateAccumulation() = runFrpTest { network ->
        val input = network.mutableTFlow<Unit>()
        var observedCount: Int? = null
        activateSpec(network) {
            val (c, j) = asyncScope { input.fold(0) { _, x -> x + 1 } }
            deferredBuildScopeAction { c.get().observe { observedCount = it } }
        }
        runCurrent()
        assertEquals(0, observedCount)

        input.emit(Unit)
        runCurrent()
        assertEquals(1, observedCount)

        input.emit(Unit)
        runCurrent()
        assertEquals(2, observedCount)
    }

    @Test
    fun effect() = runFrpTest { network ->
        val input = network.mutableTFlow<Unit>()