Donate to e Foundation | Murena handsets with /e/OS | Own a part of Murena! Learn more

Commit 6dfcf25a authored by Treehugger Robot's avatar Treehugger Robot Committed by Android (Google) Code Review
Browse files

Merge changes I9dd35754,I82c2f1d4,I31b82c5c,I0a851515 into main

* changes:
  [kairos] fix early termination of state accumulation
  [kairos] reduce coroutine usage in buildscope
  [kairos] annotate combinators as experimental
  [kairos] remove redundant NetworkScope.schedule(MuxNode)
parents b496de49 2570a8df
Loading
Loading
Loading
Loading
+22 −1
Original line number Diff line number Diff line
@@ -28,15 +28,18 @@ import kotlinx.coroutines.flow.conflate
 * Returns a [TFlow] that emits the value sampled from the [Transactional] produced by each emission
 * of the original [TFlow], within the same transaction of the original emission.
 */
@ExperimentalFrpApi
fun <A> TFlow<Transactional<A>>.sampleTransactionals(): TFlow<A> = map { it.sample() }

/** @see FrpTransactionScope.sample */
@ExperimentalFrpApi
fun <A, B, C> TFlow<A>.sample(
    state: TState<B>,
    transform: suspend FrpTransactionScope.(A, B) -> C,
): TFlow<C> = map { transform(it, state.sample()) }

/** @see FrpTransactionScope.sample */
@ExperimentalFrpApi
fun <A, B, C> TFlow<A>.sample(
    transactional: Transactional<B>,
    transform: suspend FrpTransactionScope.(A, B) -> C,
@@ -51,6 +54,7 @@ fun <A, B, C> TFlow<A>.sample(
 *
 * @see sample
 */
@ExperimentalFrpApi
fun <A, B, C> TFlow<A>.samplePromptly(
    state: TState<B>,
    transform: suspend FrpTransactionScope.(A, B) -> C,
@@ -74,6 +78,7 @@ fun <A, B, C> TFlow<A>.samplePromptly(
 * Returns a cold [Flow] that, when collected, emits from this [TFlow]. [network] is needed to
 * transactionally connect to / disconnect from the [TFlow] when collection starts/stops.
 */
@ExperimentalFrpApi
fun <A> TFlow<A>.toColdConflatedFlow(network: FrpNetwork): Flow<A> =
    channelFlow { network.activateSpec { observe { trySend(it) } } }.conflate()

@@ -81,6 +86,7 @@ fun <A> TFlow<A>.toColdConflatedFlow(network: FrpNetwork): Flow<A> =
 * Returns a cold [Flow] that, when collected, emits from this [TState]. [network] is needed to
 * transactionally connect to / disconnect from the [TState] when collection starts/stops.
 */
@ExperimentalFrpApi
fun <A> TState<A>.toColdConflatedFlow(network: FrpNetwork): Flow<A> =
    channelFlow { network.activateSpec { observe { trySend(it) } } }.conflate()

@@ -90,6 +96,7 @@ fun <A> TState<A>.toColdConflatedFlow(network: FrpNetwork): Flow<A> =
 *
 * When collection is cancelled, so is the [FrpSpec]. This means all ongoing work is cleaned up.
 */
@ExperimentalFrpApi
@JvmName("flowSpecToColdConflatedFlow")
fun <A> FrpSpec<TFlow<A>>.toColdConflatedFlow(network: FrpNetwork): Flow<A> =
    channelFlow { network.activateSpec { applySpec().observe { trySend(it) } } }.conflate()
@@ -100,6 +107,7 @@ fun <A> FrpSpec<TFlow<A>>.toColdConflatedFlow(network: FrpNetwork): Flow<A> =
 *
 * When collection is cancelled, so is the [FrpSpec]. This means all ongoing work is cleaned up.
 */
@ExperimentalFrpApi
@JvmName("stateSpecToColdConflatedFlow")
fun <A> FrpSpec<TState<A>>.toColdConflatedFlow(network: FrpNetwork): Flow<A> =
    channelFlow { network.activateSpec { applySpec().observe { trySend(it) } } }.conflate()
@@ -108,6 +116,7 @@ fun <A> FrpSpec<TState<A>>.toColdConflatedFlow(network: FrpNetwork): Flow<A> =
 * Returns a cold [Flow] that, when collected, applies this [Transactional] in a new transaction in
 * this [network], and then emits from the returned [TFlow].
 */
@ExperimentalFrpApi
@JvmName("transactionalFlowToColdConflatedFlow")
fun <A> Transactional<TFlow<A>>.toColdConflatedFlow(network: FrpNetwork): Flow<A> =
    channelFlow { network.activateSpec { sample().observe { trySend(it) } } }.conflate()
@@ -116,6 +125,7 @@ fun <A> Transactional<TFlow<A>>.toColdConflatedFlow(network: FrpNetwork): Flow<A
 * Returns a cold [Flow] that, when collected, applies this [Transactional] in a new transaction in
 * this [network], and then emits from the returned [TState].
 */
@ExperimentalFrpApi
@JvmName("transactionalStateToColdConflatedFlow")
fun <A> Transactional<TState<A>>.toColdConflatedFlow(network: FrpNetwork): Flow<A> =
    channelFlow { network.activateSpec { sample().observe { trySend(it) } } }.conflate()
@@ -126,6 +136,7 @@ fun <A> Transactional<TState<A>>.toColdConflatedFlow(network: FrpNetwork): Flow<
 *
 * When collection is cancelled, so is the [FrpStateful]. This means all ongoing work is cleaned up.
 */
@ExperimentalFrpApi
@JvmName("statefulFlowToColdConflatedFlow")
fun <A> FrpStateful<TFlow<A>>.toColdConflatedFlow(network: FrpNetwork): Flow<A> =
    channelFlow { network.activateSpec { applyStateful().observe { trySend(it) } } }.conflate()
@@ -136,11 +147,13 @@ fun <A> FrpStateful<TFlow<A>>.toColdConflatedFlow(network: FrpNetwork): Flow<A>
 *
 * When collection is cancelled, so is the [FrpStateful]. This means all ongoing work is cleaned up.
 */
@ExperimentalFrpApi
@JvmName("statefulStateToColdConflatedFlow")
fun <A> FrpStateful<TState<A>>.toColdConflatedFlow(network: FrpNetwork): Flow<A> =
    channelFlow { network.activateSpec { applyStateful().observe { trySend(it) } } }.conflate()

/** Return a [TFlow] that emits from the original [TFlow] only when [state] is `true`. */
@ExperimentalFrpApi
fun <A> TFlow<A>.filter(state: TState<Boolean>): TFlow<A> = filter { state.sample() }

private fun Iterable<Boolean>.allTrue() = all { it }
@@ -148,13 +161,15 @@ private fun Iterable<Boolean>.allTrue() = all { it }
private fun Iterable<Boolean>.anyTrue() = any { it }

/** Returns a [TState] that is `true` only when all of [states] are `true`. */
@ExperimentalFrpApi
fun allOf(vararg states: TState<Boolean>): TState<Boolean> = combine(*states) { it.allTrue() }

/** Returns a [TState] that is `true` when any of [states] are `true`. */
@ExperimentalFrpApi
fun anyOf(vararg states: TState<Boolean>): TState<Boolean> = combine(*states) { it.anyTrue() }

/** Returns a [TState] containing the inverse of the Boolean held by the original [TState]. */
fun not(state: TState<Boolean>): TState<Boolean> = state.mapCheapUnsafe { !it }
@ExperimentalFrpApi fun not(state: TState<Boolean>): TState<Boolean> = state.mapCheapUnsafe { !it }

/**
 * Represents a modal FRP sub-network.
@@ -168,6 +183,7 @@ fun not(state: TState<Boolean>): TState<Boolean> = state.mapCheapUnsafe { !it }
 *
 * @see FrpStatefulMode
 */
@ExperimentalFrpApi
fun interface FrpBuildMode<out A> {
    /**
     * Invoked when this mode is enabled. Returns a value and a [TFlow] that signals a switch to a
@@ -183,6 +199,7 @@ fun interface FrpBuildMode<out A> {
 *
 * @see FrpBuildMode
 */
@ExperimentalFrpApi
val <A> FrpBuildMode<A>.compiledFrpSpec: FrpSpec<TState<A>>
    get() = frpSpec {
        var modeChangeEvents by TFlowLoop<FrpBuildMode<A>>()
@@ -206,6 +223,7 @@ val <A> FrpBuildMode<A>.compiledFrpSpec: FrpSpec<TState<A>>
 *
 * @see FrpBuildMode
 */
@ExperimentalFrpApi
fun interface FrpStatefulMode<out A> {
    /**
     * Invoked when this mode is enabled. Returns a value and a [TFlow] that signals a switch to a
@@ -221,6 +239,7 @@ fun interface FrpStatefulMode<out A> {
 *
 * @see FrpBuildMode
 */
@ExperimentalFrpApi
val <A> FrpStatefulMode<A>.compiledStateful: FrpStateful<TState<A>>
    get() = statefully {
        var modeChangeEvents by TFlowLoop<FrpStatefulMode<A>>()
@@ -237,6 +256,7 @@ val <A> FrpStatefulMode<A>.compiledStateful: FrpStateful<TState<A>>
 * Runs [spec] in this [FrpBuildScope], and then re-runs it whenever [rebuildSignal] emits. Returns
 * a [TState] that holds the result of the currently-active [FrpSpec].
 */
@ExperimentalFrpApi
fun <A> FrpBuildScope.rebuildOn(rebuildSignal: TFlow<*>, spec: FrpSpec<A>): TState<A> =
    rebuildSignal.map { spec }.holdLatestSpec(spec)

@@ -248,5 +268,6 @@ fun <A> FrpBuildScope.rebuildOn(rebuildSignal: TFlow<*>, spec: FrpSpec<A>): TSta
 *     stateChanges.map { WithPrev(previousValue = sample(), newValue = it) }
 * ```
 */
@ExperimentalFrpApi
val <A> TState<A>.transitions: TFlow<WithPrev<A, A>>
    get() = stateChanges.map { WithPrev(previousValue = sample(), newValue = it) }
+13 −17
Original line number Diff line number Diff line
@@ -34,9 +34,9 @@ import com.android.systemui.kairos.TFlowInit
import com.android.systemui.kairos.groupByKey
import com.android.systemui.kairos.init
import com.android.systemui.kairos.internal.util.childScope
import com.android.systemui.kairos.internal.util.launchOnCancel
import com.android.systemui.kairos.internal.util.mapValuesParallel
import com.android.systemui.kairos.launchEffect
import com.android.systemui.kairos.mergeLeft
import com.android.systemui.kairos.util.Just
import com.android.systemui.kairos.util.Maybe
import com.android.systemui.kairos.util.None
@@ -49,7 +49,6 @@ import kotlin.coroutines.EmptyCoroutineContext
import kotlin.coroutines.startCoroutine
import kotlinx.coroutines.CompletableDeferred
import kotlinx.coroutines.CompletableJob
import kotlinx.coroutines.CoroutineName
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.Deferred
import kotlinx.coroutines.Job
@@ -87,7 +86,6 @@ internal class BuildScopeImpl(val stateScope: StateScopeImpl, val coroutineScope
    ): TFlow<A> {
        var job: Job? = null
        val stopEmitter = newStopEmitter()
        val handle = this.job.invokeOnCompletion { stopEmitter.emit(Unit) }
        // Create a child scope that will be kept alive beyond the end of this transaction.
        val childScope = coroutineScope.childScope()
        lateinit var emitter: Pair<T, S>
@@ -99,7 +97,6 @@ internal class BuildScopeImpl(val stateScope: StateScopeImpl, val coroutineScope
                        reenterBuildScope(this@BuildScopeImpl, childScope).runInBuildScope {
                            launchEffect {
                                builder(emitter.second)
                                handle.dispose()
                                stopEmitter.emit(Unit)
                            }
                        }
@@ -110,7 +107,7 @@ internal class BuildScopeImpl(val stateScope: StateScopeImpl, val coroutineScope
                },
            )
        emitter = constructFlow(inputNode)
        return with(frpScope) { emitter.first.takeUntil(stopEmitter) }
        return with(frpScope) { emitter.first.takeUntil(mergeLeft(stopEmitter, endSignal)) }
    }

    private fun <T> tFlowInternal(builder: suspend FrpProducerScope<T>.() -> Unit): TFlow<T> =
@@ -164,7 +161,7 @@ internal class BuildScopeImpl(val stateScope: StateScopeImpl, val coroutineScope
        val subRef = AtomicReference<Maybe<Output<A>>>(null)
        val childScope = coroutineScope.childScope()
        // When our scope is cancelled, deactivate this observer.
        childScope.launchOnCancel(CoroutineName("TFlow.observeEffect")) {
        childScope.coroutineContext.job.invokeOnCompletion {
            subRef.getAndSet(None)?.let { output ->
                if (output is Just) {
                    @Suppress("DeferredResultUnused")
@@ -215,7 +212,7 @@ internal class BuildScopeImpl(val stateScope: StateScopeImpl, val coroutineScope
                    } else if (needsEval) {
                        outputNode.schedule(evalScope = stateScope.evalScope)
                    }
                } ?: childScope.cancel()
                } ?: run { childScope.cancel() }
        }
        return childScope.coroutineContext.job
    }
@@ -229,10 +226,7 @@ internal class BuildScopeImpl(val stateScope: StateScopeImpl, val coroutineScope
                "mapBuild",
                mapImpl({ init.connect(evalScope = this) }) { spec ->
                        reenterBuildScope(outerScope = this@BuildScopeImpl, childScope)
                            .runInBuildScope {
                                val (result, _) = asyncScope { transform(spec) }
                                result.get()
                            }
                            .runInBuildScope { transform(spec) }
                    }
                    .cached(),
            )
@@ -304,12 +298,14 @@ internal class BuildScopeImpl(val stateScope: StateScopeImpl, val coroutineScope
        childScope.coroutineContext.job.invokeOnCompletion { stopEmitter.emit(Unit) }
        // Ensure that once this transaction is done, the new child scope enters the completing
        // state (kept alive so long as there are child jobs).
        scheduleOutput(
            OneShot {
                // TODO: don't like this cast
                (childScope.coroutineContext.job as CompletableJob).complete()
            }
        )
        // TODO: need to keep the scope alive if it's used to accumulate state.
        //  Otherwise, stopEmitter will emit early, due to the call to complete().
        //        scheduleOutput(
        //            OneShot {
        //                // TODO: don't like this cast
        //                (childScope.coroutineContext.job as CompletableJob).complete()
        //            }
        //        )
        return BuildScopeImpl(
            stateScope = StateScopeImpl(evalScope = stateScope.evalScope, endSignal = stopEmitter),
            coroutineScope = childScope,
+0 −2
Original line number Diff line number Diff line
@@ -66,8 +66,6 @@ internal interface NetworkScope : InitScope {

    fun schedule(state: TStateSource<*>)

    suspend fun schedule(node: MuxNode<*, *, *>)

    fun scheduleDeactivation(node: PushNode<*>)

    fun scheduleDeactivation(output: Output<*>)
+9 −1
Original line number Diff line number Diff line
@@ -188,6 +188,14 @@ internal sealed class MuxNode<K : Any, V, Output>(val lifecycle: MuxLifecycle<Ou
    }

    abstract fun hasCurrentValueLocked(transactionStore: TransactionStore): Boolean

    fun schedule(evalScope: EvalScope) {
        // TODO: Potential optimization
        //  Detect if this node is guaranteed to have a single upstream within this transaction,
        //  then bypass scheduling it. Instead immediately schedule its downstream and treat this
        //  MuxNode as a Pull (effectively making it a mapCheap).
        depthTracker.schedule(evalScope.scheduler, this)
    }
}

/** An input branch of a mux node, associated with a key. */
@@ -202,7 +210,7 @@ internal class MuxBranchNode<K : Any, V>(private val muxNode: MuxNode<K, V, *>,
        val upstreamResult = upstream.getPushEvent(evalScope)
        if (upstreamResult is Just) {
            muxNode.upstreamData[key] = upstreamResult.value
            evalScope.schedule(muxNode)
            muxNode.schedule(evalScope)
        }
    }

+1 −1
Original line number Diff line number Diff line
@@ -409,7 +409,7 @@ internal fun <K : Any, A> switchDeferredImpl(
                // Schedule for evaluation if any switched-in nodes have already emitted within
                // this transaction.
                if (muxNode.upstreamData.isNotEmpty()) {
                    evalScope.schedule(muxNode)
                    muxNode.schedule(evalScope)
                }
                return muxNode.takeUnless { muxNode.switchedIn.isEmpty() && !isIndirect }
            }
Loading