Donate to e Foundation | Murena handsets with /e/OS | Own a part of Murena! Learn more

Commit 02946ae2 authored by Steve Elliott's avatar Steve Elliott
Browse files

[kairos] generalize node storage

Rather than using (Mutable)Maps for all internal storage, provide a
mechanism by which custom Map impls can be used.

Flag: EXEMPT unused
Test: atest kairos-tests
Change-Id: Ic75286db27426cda7f41d1f2fdba138b1ce66e2f
parent 7a10baee
Loading
Loading
Loading
Loading
+4 −4
Original line number Diff line number Diff line
@@ -85,7 +85,7 @@ interface FrpStateScope : FrpTransactionScope {
     * @see merge
     */
    @ExperimentalFrpApi
    fun <K : Any, V> TFlow<Map<K, Maybe<TFlow<V>>>>.mergeIncrementally(
    fun <K, V> TFlow<Map<K, Maybe<TFlow<V>>>>.mergeIncrementally(
        initialTFlows: FrpDeferredValue<Map<K, TFlow<V>>>
    ): TFlow<Map<K, V>>

@@ -107,7 +107,7 @@ interface FrpStateScope : FrpTransactionScope {
     * @see merge
     */
    @ExperimentalFrpApi
    fun <K : Any, V> TFlow<Map<K, Maybe<TFlow<V>>>>.mergeIncrementallyPromptly(
    fun <K, V> TFlow<Map<K, Maybe<TFlow<V>>>>.mergeIncrementallyPromptly(
        initialTFlows: FrpDeferredValue<Map<K, TFlow<V>>>
    ): TFlow<Map<K, V>>

@@ -131,7 +131,7 @@ interface FrpStateScope : FrpTransactionScope {
     * @see merge
     */
    @ExperimentalFrpApi
    fun <K : Any, V> TFlow<Map<K, Maybe<TFlow<V>>>>.mergeIncrementally(
    fun <K, V> TFlow<Map<K, Maybe<TFlow<V>>>>.mergeIncrementally(
        initialTFlows: Map<K, TFlow<V>> = emptyMap()
    ): TFlow<Map<K, V>> = mergeIncrementally(deferredOf(initialTFlows))

@@ -153,7 +153,7 @@ interface FrpStateScope : FrpTransactionScope {
     * @see merge
     */
    @ExperimentalFrpApi
    fun <K : Any, V> TFlow<Map<K, Maybe<TFlow<V>>>>.mergeIncrementallyPromptly(
    fun <K, V> TFlow<Map<K, Maybe<TFlow<V>>>>.mergeIncrementallyPromptly(
        initialTFlows: Map<K, TFlow<V>> = emptyMap()
    ): TFlow<Map<K, V>> = mergeIncrementallyPromptly(deferredOf(initialTFlows))

+19 −18
Original line number Diff line number Diff line
@@ -26,6 +26,7 @@ import com.android.systemui.kairos.internal.TFlowImpl
import com.android.systemui.kairos.internal.activated
import com.android.systemui.kairos.internal.cached
import com.android.systemui.kairos.internal.constInit
import com.android.systemui.kairos.internal.demuxMap
import com.android.systemui.kairos.internal.filterImpl
import com.android.systemui.kairos.internal.filterJustImpl
import com.android.systemui.kairos.internal.init
@@ -35,7 +36,7 @@ import com.android.systemui.kairos.internal.mergeNodes
import com.android.systemui.kairos.internal.mergeNodesLeft
import com.android.systemui.kairos.internal.neverImpl
import com.android.systemui.kairos.internal.switchDeferredImplSingle
import com.android.systemui.kairos.internal.switchPromptImpl
import com.android.systemui.kairos.internal.switchPromptImplSingle
import com.android.systemui.kairos.internal.util.hashString
import com.android.systemui.kairos.util.Either
import com.android.systemui.kairos.util.Left
@@ -344,7 +345,7 @@ fun <K, A> Map<K, TFlow<A>>.merge(): TFlow<Map<K, A>> =
 */
@ExperimentalFrpApi
fun <K, A> TFlow<Map<K, A>>.groupByKey(numKeys: Int? = null): GroupedTFlow<K, A> =
    GroupedTFlow(DemuxImpl({ init.connect(this) }, numKeys))
    GroupedTFlow(demuxMap({ init.connect(this) }, numKeys))

/**
 * Shorthand for `map { mapOf(extractKey(it) to it) }.groupByKey()`
@@ -417,8 +418,8 @@ class GroupedTFlow<in K, out A> internal constructor(internal val impl: DemuxImp
 * that takes effect immediately, see [switchPromptly].
 */
@ExperimentalFrpApi
fun <A> TState<TFlow<A>>.switch(): TFlow<A> {
    return TFlowInit(
fun <A> TState<TFlow<A>>.switch(): TFlow<A> =
    TFlowInit(
        constInit(
            name = null,
            switchDeferredImplSingle(
@@ -433,7 +434,6 @@ fun <A> TState<TFlow<A>>.switch(): TFlow<A> {
            ),
        )
    )
}

/**
 * Returns a [TFlow] that switches to the [TFlow] contained within this [TState] whenever it
@@ -444,21 +444,22 @@ fun <A> TState<TFlow<A>>.switch(): TFlow<A> {
 */
// TODO: parameter to handle coincidental emission from both old and new
@ExperimentalFrpApi
fun <A> TState<TFlow<A>>.switchPromptly(): TFlow<A> {
    val switchNode =
        switchPromptImpl(
fun <A> TState<TFlow<A>>.switchPromptly(): TFlow<A> =
    TFlowInit(
        constInit(
            name = null,
            switchPromptImplSingle(
                getStorage = {
                mapOf(Unit to init.connect(this).getCurrentWithEpoch(this).first.init.connect(this))
                    init.connect(this).getCurrentWithEpoch(this).first.init.connect(this)
                },
                getPatches = {
                val patches = init.connect(this).changes
                mapImpl({ patches }) { newFlow -> mapOf(Unit to just(newFlow.init.connect(this))) }
                    mapImpl({ init.connect(this).changes }) { newFlow ->
                        newFlow.init.connect(this)
                    }
                },
            ),
        )
    return TFlowInit(
        constInit(name = null, mapImpl({ switchNode }) { it.getValue(Unit).getPushEvent(this) })
    )
}

/**
 * A mutable [TFlow] that provides the ability to [emit] values to the flow, handling backpressure
+3 −2
Original line number Diff line number Diff line
@@ -36,6 +36,7 @@ import com.android.systemui.kairos.internal.map
import com.android.systemui.kairos.internal.mapCheap
import com.android.systemui.kairos.internal.mapImpl
import com.android.systemui.kairos.internal.util.hashString
import com.android.systemui.kairos.internal.zipStateMap
import com.android.systemui.kairos.internal.zipStates
import kotlin.reflect.KProperty
import kotlinx.coroutines.CompletableDeferred
@@ -159,12 +160,12 @@ fun <A> Iterable<TState<A>>.combine(): TState<List<A>> {
 * @see TState.combineWith
 */
@ExperimentalFrpApi
fun <K : Any, A> Map<K, TState<A>>.combine(): TState<Map<K, A>> {
fun <K, A> Map<K, TState<A>>.combine(): TState<Map<K, A>> {
    val operatorName = "combine"
    val name = operatorName
    return TStateInit(
        init(name) {
            zipStates(
            zipStateMap(
                name,
                operatorName,
                states = mapValues { it.value.init.connect(evalScope = this) },
+1 −1
Original line number Diff line number Diff line
@@ -133,7 +133,7 @@ internal fun TStateImpl<*>.dump(infoById: MutableMap<Any, InitInfo>, edges: Muta
                            edges.add(Edge(upstream = state.upstream, downstream = state))
                            Mapped(cheap = false)
                        }
                        is DerivedZipped<*, *> -> {
                        is DerivedZipped<*, *, *> -> {
                            state.upstream.forEach { (key, upstream) ->
                                edges.add(
                                    Edge(upstream = upstream, downstream = state, tag = "key=$key")
+53 −34
Original line number Diff line number Diff line
@@ -16,17 +16,20 @@

package com.android.systemui.kairos.internal

import com.android.systemui.kairos.internal.store.ConcurrentHashMapK
import com.android.systemui.kairos.internal.store.MapHolder
import com.android.systemui.kairos.internal.store.MapK
import com.android.systemui.kairos.internal.store.MutableMapK
import com.android.systemui.kairos.internal.util.hashString
import java.util.concurrent.ConcurrentHashMap
import kotlinx.coroutines.coroutineScope
import kotlinx.coroutines.launch
import kotlinx.coroutines.sync.Mutex
import kotlinx.coroutines.sync.withLock

internal class DemuxNode<K, A>(
    private val branchNodeByKey: ConcurrentHashMap<K, DemuxNode<K, A>.BranchNode>,
internal class DemuxNode<W, K, A>(
    private val branchNodeByKey: MutableMapK<W, K, DemuxNode<W, K, A>.BranchNode>,
    val lifecycle: DemuxLifecycle<K, A>,
    private val spec: DemuxActivator<K, A>,
    private val spec: DemuxActivator<W, K, A>,
) : SchedulableNode {

    val schedulable = Schedulable.N(this)
@@ -34,7 +37,7 @@ internal class DemuxNode<K, A>(
    inline val mutex
        get() = lifecycle.mutex

    lateinit var upstreamConnection: NodeConnection<Map<K, A>>
    lateinit var upstreamConnection: NodeConnection<MapK<W, K, A>>

    @Volatile private var epoch: Long = Long.MIN_VALUE

@@ -52,7 +55,10 @@ internal class DemuxNode<K, A>(
        mutex.withLock {
            updateEpoch(evalScope)
            for ((key, _) in upstreamResult) {
                branchNodeByKey[key]?.let { branch -> launch { branch.schedule(evalScope) } }
                if (key !in branchNodeByKey) continue
                val branch = branchNodeByKey.getValue(key)
                // TODO: launchImmediate?
                launch { branch.schedule(evalScope) }
            }
        }
    }
@@ -75,7 +81,7 @@ internal class DemuxNode<K, A>(
    override suspend fun moveIndirectUpstreamToDirect(
        scheduler: Scheduler,
        oldIndirectDepth: Int,
        oldIndirectSet: Set<MuxDeferredNode<*, *>>,
        oldIndirectSet: Set<MuxDeferredNode<*, *, *>>,
        newDirectDepth: Int,
    ) {
        coroutineScope {
@@ -97,8 +103,8 @@ internal class DemuxNode<K, A>(
        scheduler: Scheduler,
        oldDepth: Int,
        newDepth: Int,
        removals: Set<MuxDeferredNode<*, *>>,
        additions: Set<MuxDeferredNode<*, *>>,
        removals: Set<MuxDeferredNode<*, *, *>>,
        additions: Set<MuxDeferredNode<*, *, *>>,
    ) {
        coroutineScope {
            mutex.withLock {
@@ -120,7 +126,7 @@ internal class DemuxNode<K, A>(
        scheduler: Scheduler,
        oldDirectDepth: Int,
        newIndirectDepth: Int,
        newIndirectSet: Set<MuxDeferredNode<*, *>>,
        newIndirectSet: Set<MuxDeferredNode<*, *, *>>,
    ) {
        coroutineScope {
            mutex.withLock {
@@ -140,7 +146,7 @@ internal class DemuxNode<K, A>(
    override suspend fun removeIndirectUpstream(
        scheduler: Scheduler,
        depth: Int,
        indirectSet: Set<MuxDeferredNode<*, *>>,
        indirectSet: Set<MuxDeferredNode<*, *, *>>,
    ) {
        coroutineScope {
            mutex.withLock {
@@ -245,25 +251,35 @@ internal class DemuxNode<K, A>(
    }
}

internal fun <K, A> DemuxImpl(
internal fun <W, K, A> DemuxImpl(
    upstream: TFlowImpl<MapK<W, K, A>>,
    numKeys: Int?,
    storeFactory: MutableMapK.Factory<W, K>,
): DemuxImpl<K, A> =
    DemuxImpl(
        DemuxLifecycle(
            DemuxLifecycleState.Inactive(DemuxActivator(numKeys, upstream, storeFactory))
        )
    )

internal fun <K, A> demuxMap(
    upstream: suspend EvalScope.() -> TFlowImpl<Map<K, A>>,
    numKeys: Int?,
): DemuxImpl<K, A> =
    DemuxImpl(DemuxLifecycle(DemuxLifecycleState.Inactive(DemuxActivator(numKeys, upstream))))
    DemuxImpl(mapImpl(upstream) { MapHolder(it) }, numKeys, ConcurrentHashMapK.Factory())

internal class DemuxActivator<K, A>(
internal class DemuxActivator<W, K, A>(
    private val numKeys: Int?,
    private val upstream: suspend EvalScope.() -> TFlowImpl<Map<K, A>>,
    private val upstream: TFlowImpl<MapK<W, K, A>>,
    private val storeFactory: MutableMapK.Factory<W, K>,
) {
    suspend fun activate(
        evalScope: EvalScope,
        lifecycle: DemuxLifecycle<K, A>,
    ): Pair<DemuxNode<K, A>, Set<K>>? {
        val demux = DemuxNode(ConcurrentHashMap(numKeys ?: 16), lifecycle, this)
        return upstream
            .invoke(evalScope)
            .activate(evalScope, downstream = demux.schedulable)
            ?.let { (conn, needsEval) ->
    ): Pair<DemuxNode<W, K, A>, Set<K>>? {
        val demux = DemuxNode(storeFactory.create(numKeys), lifecycle, this)
        return upstream.activate(evalScope, downstream = demux.schedulable)?.let { (conn, needsEval)
            ->
            Pair(
                demux.apply { upstreamConnection = conn },
                if (needsEval) {
@@ -295,7 +311,10 @@ internal class DemuxLifecycle<K, A>(@Volatile var lifecycleState: DemuxLifecycle

    override fun toString(): String = "TFlowDmuxState[$hashString][$lifecycleState][$mutex]"

    suspend fun activate(evalScope: EvalScope, key: K): Pair<DemuxNode<K, A>.BranchNode, Boolean>? =
    suspend fun activate(
        evalScope: EvalScope,
        key: K,
    ): Pair<DemuxNode<*, K, A>.BranchNode, Boolean>? =
        mutex.withLock {
            when (val state = lifecycleState) {
                is DemuxLifecycleState.Dead -> null
@@ -322,11 +341,11 @@ internal class DemuxLifecycle<K, A>(@Volatile var lifecycleState: DemuxLifecycle
}

internal sealed interface DemuxLifecycleState<out K, out A> {
    class Inactive<K, A>(val spec: DemuxActivator<K, A>) : DemuxLifecycleState<K, A> {
    class Inactive<K, A>(val spec: DemuxActivator<*, K, A>) : DemuxLifecycleState<K, A> {
        override fun toString(): String = "Inactive"
    }

    class Active<K, A>(val node: DemuxNode<K, A>) : DemuxLifecycleState<K, A> {
    class Active<K, A>(val node: DemuxNode<*, K, A>) : DemuxLifecycleState<K, A> {
        override fun toString(): String = "Active(node=$node)"
    }

Loading