Donate to e Foundation | Murena handsets with /e/OS | Own a part of Murena! Learn more

Commit 67c7f485 authored by Treehugger Robot's avatar Treehugger Robot Committed by Android (Google) Code Review
Browse files

Merge changes Ic75286db,I0d48b5ab into main

* changes:
  [kairos] generalize node storage
  [kairos] propagate transaction errors gracefully
parents 3abeb020 02946ae2
Loading
Loading
Loading
Loading
+2 −14
Original line number Diff line number Diff line
@@ -24,7 +24,6 @@ import com.android.systemui.kairos.internal.util.childScope
import kotlin.coroutines.CoroutineContext
import kotlin.coroutines.EmptyCoroutineContext
import kotlin.coroutines.coroutineContext
import kotlinx.coroutines.CompletableDeferred
import kotlinx.coroutines.CoroutineName
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.Job
@@ -134,19 +133,8 @@ internal class LocalFrpNetwork(
    private val scope: CoroutineScope,
    private val endSignal: TFlow<Any>,
) : FrpNetwork {
    override suspend fun <R> transact(block: suspend FrpTransactionScope.() -> R): R {
        val result = CompletableDeferred<R>(coroutineContext[Job])
        @Suppress("DeferredResultUnused")
        network.transaction("FrpNetwork.transact") {
            val buildScope =
                BuildScopeImpl(
                    stateScope = StateScopeImpl(evalScope = this, endSignal = endSignal),
                    coroutineScope = scope,
                )
            buildScope.runInBuildScope { effect { result.complete(block()) } }
        }
        return result.await()
    }
    override suspend fun <R> transact(block: suspend FrpTransactionScope.() -> R): R =
        network.transaction("FrpNetwork.transact") { runInTransactionScope { block() } }.await()

    override suspend fun activateSpec(spec: FrpSpec<*>) {
        val job =
+4 −4
Original line number Diff line number Diff line
@@ -85,7 +85,7 @@ interface FrpStateScope : FrpTransactionScope {
     * @see merge
     */
    @ExperimentalFrpApi
    fun <K : Any, V> TFlow<Map<K, Maybe<TFlow<V>>>>.mergeIncrementally(
    fun <K, V> TFlow<Map<K, Maybe<TFlow<V>>>>.mergeIncrementally(
        initialTFlows: FrpDeferredValue<Map<K, TFlow<V>>>
    ): TFlow<Map<K, V>>

@@ -107,7 +107,7 @@ interface FrpStateScope : FrpTransactionScope {
     * @see merge
     */
    @ExperimentalFrpApi
    fun <K : Any, V> TFlow<Map<K, Maybe<TFlow<V>>>>.mergeIncrementallyPromptly(
    fun <K, V> TFlow<Map<K, Maybe<TFlow<V>>>>.mergeIncrementallyPromptly(
        initialTFlows: FrpDeferredValue<Map<K, TFlow<V>>>
    ): TFlow<Map<K, V>>

@@ -131,7 +131,7 @@ interface FrpStateScope : FrpTransactionScope {
     * @see merge
     */
    @ExperimentalFrpApi
    fun <K : Any, V> TFlow<Map<K, Maybe<TFlow<V>>>>.mergeIncrementally(
    fun <K, V> TFlow<Map<K, Maybe<TFlow<V>>>>.mergeIncrementally(
        initialTFlows: Map<K, TFlow<V>> = emptyMap()
    ): TFlow<Map<K, V>> = mergeIncrementally(deferredOf(initialTFlows))

@@ -153,7 +153,7 @@ interface FrpStateScope : FrpTransactionScope {
     * @see merge
     */
    @ExperimentalFrpApi
    fun <K : Any, V> TFlow<Map<K, Maybe<TFlow<V>>>>.mergeIncrementallyPromptly(
    fun <K, V> TFlow<Map<K, Maybe<TFlow<V>>>>.mergeIncrementallyPromptly(
        initialTFlows: Map<K, TFlow<V>> = emptyMap()
    ): TFlow<Map<K, V>> = mergeIncrementallyPromptly(deferredOf(initialTFlows))

+19 −18
Original line number Diff line number Diff line
@@ -26,6 +26,7 @@ import com.android.systemui.kairos.internal.TFlowImpl
import com.android.systemui.kairos.internal.activated
import com.android.systemui.kairos.internal.cached
import com.android.systemui.kairos.internal.constInit
import com.android.systemui.kairos.internal.demuxMap
import com.android.systemui.kairos.internal.filterImpl
import com.android.systemui.kairos.internal.filterJustImpl
import com.android.systemui.kairos.internal.init
@@ -35,7 +36,7 @@ import com.android.systemui.kairos.internal.mergeNodes
import com.android.systemui.kairos.internal.mergeNodesLeft
import com.android.systemui.kairos.internal.neverImpl
import com.android.systemui.kairos.internal.switchDeferredImplSingle
import com.android.systemui.kairos.internal.switchPromptImpl
import com.android.systemui.kairos.internal.switchPromptImplSingle
import com.android.systemui.kairos.internal.util.hashString
import com.android.systemui.kairos.util.Either
import com.android.systemui.kairos.util.Left
@@ -344,7 +345,7 @@ fun <K, A> Map<K, TFlow<A>>.merge(): TFlow<Map<K, A>> =
 */
@ExperimentalFrpApi
fun <K, A> TFlow<Map<K, A>>.groupByKey(numKeys: Int? = null): GroupedTFlow<K, A> =
    GroupedTFlow(DemuxImpl({ init.connect(this) }, numKeys))
    GroupedTFlow(demuxMap({ init.connect(this) }, numKeys))

/**
 * Shorthand for `map { mapOf(extractKey(it) to it) }.groupByKey()`
@@ -417,8 +418,8 @@ class GroupedTFlow<in K, out A> internal constructor(internal val impl: DemuxImp
 * that takes effect immediately, see [switchPromptly].
 */
@ExperimentalFrpApi
fun <A> TState<TFlow<A>>.switch(): TFlow<A> {
    return TFlowInit(
fun <A> TState<TFlow<A>>.switch(): TFlow<A> =
    TFlowInit(
        constInit(
            name = null,
            switchDeferredImplSingle(
@@ -433,7 +434,6 @@ fun <A> TState<TFlow<A>>.switch(): TFlow<A> {
            ),
        )
    )
}

/**
 * Returns a [TFlow] that switches to the [TFlow] contained within this [TState] whenever it
@@ -444,21 +444,22 @@ fun <A> TState<TFlow<A>>.switch(): TFlow<A> {
 */
// TODO: parameter to handle coincidental emission from both old and new
@ExperimentalFrpApi
fun <A> TState<TFlow<A>>.switchPromptly(): TFlow<A> {
    val switchNode =
        switchPromptImpl(
fun <A> TState<TFlow<A>>.switchPromptly(): TFlow<A> =
    TFlowInit(
        constInit(
            name = null,
            switchPromptImplSingle(
                getStorage = {
                mapOf(Unit to init.connect(this).getCurrentWithEpoch(this).first.init.connect(this))
                    init.connect(this).getCurrentWithEpoch(this).first.init.connect(this)
                },
                getPatches = {
                val patches = init.connect(this).changes
                mapImpl({ patches }) { newFlow -> mapOf(Unit to just(newFlow.init.connect(this))) }
                    mapImpl({ init.connect(this).changes }) { newFlow ->
                        newFlow.init.connect(this)
                    }
                },
            ),
        )
    return TFlowInit(
        constInit(name = null, mapImpl({ switchNode }) { it.getValue(Unit).getPushEvent(this) })
    )
}

/**
 * A mutable [TFlow] that provides the ability to [emit] values to the flow, handling backpressure
+3 −2
Original line number Diff line number Diff line
@@ -36,6 +36,7 @@ import com.android.systemui.kairos.internal.map
import com.android.systemui.kairos.internal.mapCheap
import com.android.systemui.kairos.internal.mapImpl
import com.android.systemui.kairos.internal.util.hashString
import com.android.systemui.kairos.internal.zipStateMap
import com.android.systemui.kairos.internal.zipStates
import kotlin.reflect.KProperty
import kotlinx.coroutines.CompletableDeferred
@@ -159,12 +160,12 @@ fun <A> Iterable<TState<A>>.combine(): TState<List<A>> {
 * @see TState.combineWith
 */
@ExperimentalFrpApi
fun <K : Any, A> Map<K, TState<A>>.combine(): TState<Map<K, A>> {
fun <K, A> Map<K, TState<A>>.combine(): TState<Map<K, A>> {
    val operatorName = "combine"
    val name = operatorName
    return TStateInit(
        init(name) {
            zipStates(
            zipStateMap(
                name,
                operatorName,
                states = mapValues { it.value.init.connect(evalScope = this) },
+1 −1
Original line number Diff line number Diff line
@@ -133,7 +133,7 @@ internal fun TStateImpl<*>.dump(infoById: MutableMap<Any, InitInfo>, edges: Muta
                            edges.add(Edge(upstream = state.upstream, downstream = state))
                            Mapped(cheap = false)
                        }
                        is DerivedZipped<*, *> -> {
                        is DerivedZipped<*, *, *> -> {
                            state.upstream.forEach { (key, upstream) ->
                                edges.add(
                                    Edge(upstream = upstream, downstream = state, tag = "key=$key")
Loading