Donate to e Foundation | Murena handsets with /e/OS | Own a part of Murena! Learn more

Commit c271f98d authored by Steve Elliott's avatar Steve Elliott
Browse files

[kairos] optimize pull nodes

- use epoch to determine if there is a result within the current
transaction

- have mux nodes produce lazier results

- eliminate Maybe indirection in transaction store for pull nodes

- separate out "mapping" from "filtering", allowing for more
fine-grained control over caching.

Flag: EXEMPT unused
Test: atest kairos-tests
Change-Id: I8ea329d40bca1e792cf38c96be444db202a91333
parent de547b23
Loading
Loading
Loading
Loading
+15 −13
Original line number Diff line number Diff line
@@ -26,11 +26,11 @@ import com.android.systemui.kairos.internal.TFlowImpl
import com.android.systemui.kairos.internal.activated
import com.android.systemui.kairos.internal.cached
import com.android.systemui.kairos.internal.constInit
import com.android.systemui.kairos.internal.filterNode
import com.android.systemui.kairos.internal.filterImpl
import com.android.systemui.kairos.internal.filterJustImpl
import com.android.systemui.kairos.internal.init
import com.android.systemui.kairos.internal.map
import com.android.systemui.kairos.internal.mapImpl
import com.android.systemui.kairos.internal.mapMaybeNode
import com.android.systemui.kairos.internal.mergeNodes
import com.android.systemui.kairos.internal.mergeNodesLeft
import com.android.systemui.kairos.internal.neverImpl
@@ -121,11 +121,8 @@ val <A> TState<A>.stateChanges: TFlow<A>
 * @see mapNotNull
 */
@ExperimentalFrpApi
fun <A, B> TFlow<A>.mapMaybe(transform: suspend FrpTransactionScope.(A) -> Maybe<B>): TFlow<B> {
    val pulse =
        mapMaybeNode({ init.connect(evalScope = this) }) { runInTransactionScope { transform(it) } }
    return TFlowInit(constInit(name = null, pulse))
}
fun <A, B> TFlow<A>.mapMaybe(transform: suspend FrpTransactionScope.(A) -> Maybe<B>): TFlow<B> =
    map(transform).filterJust()

/**
 * Returns a [TFlow] that contains only the non-null results of applying [transform] to each value
@@ -140,14 +137,17 @@ fun <A, B> TFlow<A>.mapNotNull(transform: suspend FrpTransactionScope.(A) -> B?)
    }

/** Returns a [TFlow] containing only values of the original [TFlow] that are not null. */
@ExperimentalFrpApi fun <A> TFlow<A?>.filterNotNull(): TFlow<A> = mapNotNull { it }
@ExperimentalFrpApi
fun <A> TFlow<A?>.filterNotNull(): TFlow<A> = mapCheap { it.toMaybe() }.filterJust()

/** Shorthand for `mapNotNull { it as? A }`. */
@ExperimentalFrpApi
inline fun <reified A> TFlow<*>.filterIsInstance(): TFlow<A> = mapNotNull { it as? A }
inline fun <reified A> TFlow<*>.filterIsInstance(): TFlow<A> = mapCheap { it as? A }.filterNotNull()

/** Shorthand for `mapMaybe { it }`. */
@ExperimentalFrpApi fun <A> TFlow<Maybe<A>>.filterJust(): TFlow<A> = mapMaybe { it }
@ExperimentalFrpApi
fun <A> TFlow<Maybe<A>>.filterJust(): TFlow<A> =
    TFlowInit(constInit(name = null, filterJustImpl { init.connect(evalScope = this) }))

/**
 * Returns a [TFlow] containing the results of applying [transform] to each value of the original
@@ -203,8 +203,8 @@ fun <A> TFlow<A>.onEach(action: suspend FrpTransactionScope.(A) -> Unit): TFlow<
@ExperimentalFrpApi
fun <A> TFlow<A>.filter(predicate: suspend FrpTransactionScope.(A) -> Boolean): TFlow<A> {
    val pulse =
        filterNode({ init.connect(evalScope = this) }) { runInTransactionScope { predicate(it) } }
    return TFlowInit(constInit(name = null, pulse.cached()))
        filterImpl({ init.connect(evalScope = this) }) { runInTransactionScope { predicate(it) } }
    return TFlowInit(constInit(name = null, pulse))
}

/**
@@ -455,7 +455,9 @@ fun <A> TState<TFlow<A>>.switchPromptly(): TFlow<A> {
                mapImpl({ patches }) { newFlow -> mapOf(Unit to just(newFlow.init.connect(this))) }
            },
        )
    return TFlowInit(constInit(name = null, mapImpl({ switchNode }) { it.getValue(Unit) }))
    return TFlowInit(
        constInit(name = null, mapImpl({ switchNode }) { it.getValue(Unit).getPushEvent(this) })
    )
}

/**
+2 −2
Original line number Diff line number Diff line
@@ -29,7 +29,7 @@ import com.android.systemui.kairos.internal.activated
import com.android.systemui.kairos.internal.cached
import com.android.systemui.kairos.internal.constInit
import com.android.systemui.kairos.internal.constS
import com.android.systemui.kairos.internal.filterNode
import com.android.systemui.kairos.internal.filterImpl
import com.android.systemui.kairos.internal.flatMap
import com.android.systemui.kairos.internal.init
import com.android.systemui.kairos.internal.map
@@ -469,7 +469,7 @@ internal constructor(internal val network: Network, initialValue: Deferred<T>) :
        val operatorName = "MutableTState"
        lateinit var state: TStateSource<T>
        val calm: TFlowImpl<T> =
            filterNode({ mapImpl(upstream = { changes.activated() }) { it!!.await() } }) { new ->
            filterImpl({ mapImpl(upstream = { changes.activated() }) { it!!.await() } }) { new ->
                    new != state.getCurrentWithEpoch(evalScope = this).first
                }
                .cached()
+104 −119
Original line number Diff line number Diff line
@@ -14,25 +14,17 @@
 * limitations under the License.
 */

@file:Suppress("NOTHING_TO_INLINE")

package com.android.systemui.kairos.internal

import com.android.systemui.kairos.internal.util.hashString
import com.android.systemui.kairos.util.Just
import com.android.systemui.kairos.util.Maybe
import com.android.systemui.kairos.util.flatMap
import com.android.systemui.kairos.util.getMaybe
import java.util.concurrent.ConcurrentHashMap
import kotlinx.coroutines.CompletableDeferred
import kotlinx.coroutines.async
import kotlinx.coroutines.coroutineScope
import kotlinx.coroutines.launch
import kotlinx.coroutines.sync.Mutex
import kotlinx.coroutines.sync.withLock

internal class DemuxNode<K, A>(
    private val branchNodeByKey: ConcurrentHashMap<K, DemuxBranchNode<K, A>>,
    private val branchNodeByKey: ConcurrentHashMap<K, DemuxNode<K, A>.BranchNode>,
    val lifecycle: DemuxLifecycle<K, A>,
    private val spec: DemuxActivator<K, A>,
) : SchedulableNode {
@@ -44,25 +36,23 @@ internal class DemuxNode<K, A>(

    lateinit var upstreamConnection: NodeConnection<Map<K, A>>

    fun getAndMaybeAddDownstream(key: K): DemuxBranchNode<K, A> =
        branchNodeByKey.getOrPut(key) { DemuxBranchNode(key, this) }
    @Volatile private var epoch: Long = Long.MIN_VALUE

    suspend fun hasCurrentValueLocked(evalScope: EvalScope, key: K): Boolean =
        evalScope.epoch == epoch && upstreamConnection.getPushEvent(evalScope).contains(key)

    suspend fun hasCurrentValue(evalScope: EvalScope, key: K): Boolean =
        mutex.withLock { hasCurrentValueLocked(evalScope, key) }

    fun getAndMaybeAddDownstream(key: K): BranchNode =
        branchNodeByKey.getOrPut(key) { BranchNode(key) }

    override suspend fun schedule(evalScope: EvalScope) {
    override suspend fun schedule(evalScope: EvalScope) = coroutineScope {
        val upstreamResult = upstreamConnection.getPushEvent(evalScope)
        if (upstreamResult is Just) {
            coroutineScope {
                val outerScope = this
        mutex.withLock {
                    coroutineScope {
                        for ((key, _) in upstreamResult.value) {
                            launch {
                                branchNodeByKey[key]?.let { branch ->
                                    outerScope.launch { branch.schedule(evalScope) }
                                }
                            }
                        }
                    }
                }
            updateEpoch(evalScope)
            for ((key, _) in upstreamResult) {
                branchNodeByKey[key]?.let { branch -> launch { branch.schedule(evalScope) } }
            }
        }
    }
@@ -194,23 +184,27 @@ internal class DemuxNode<K, A>(
            upstreamConnection.removeDownstreamAndDeactivateIfNeeded(downstream = schedulable)
        }
    }

    fun updateEpoch(evalScope: EvalScope) {
        epoch = evalScope.epoch
    }

internal class DemuxBranchNode<K, A>(val key: K, private val demuxNode: DemuxNode<K, A>) :
    PushNode<A> {
    suspend fun getPushEvent(evalScope: EvalScope, key: K): A =
        upstreamConnection.getPushEvent(evalScope).getValue(key)

    inner class BranchNode(val key: K) : PushNode<A> {

        private val mutex = Mutex()

        val downstreamSet = DownstreamSet()

        override val depthTracker: DepthTracker
        get() = demuxNode.upstreamConnection.depthTracker
            get() = upstreamConnection.depthTracker

    override suspend fun hasCurrentValue(transactionStore: TransactionStore): Boolean =
        demuxNode.upstreamConnection.hasCurrentValue(transactionStore)
        override suspend fun hasCurrentValue(evalScope: EvalScope): Boolean =
            hasCurrentValue(evalScope, key)

    override suspend fun getPushEvent(evalScope: EvalScope): Maybe<A> =
        demuxNode.upstreamConnection.getPushEvent(evalScope).flatMap { it.getMaybe(key) }
        override suspend fun getPushEvent(evalScope: EvalScope): A = getPushEvent(evalScope, key)

        override suspend fun addDownstream(downstream: Schedulable) {
            mutex.withLock { downstreamSet.add(downstream) }
@@ -227,13 +221,13 @@ internal class DemuxBranchNode<K, A>(val key: K, private val demuxNode: DemuxNod
                    downstreamSet.isEmpty()
                }
            if (canDeactivate) {
            demuxNode.removeDownstreamAndDeactivateIfNeeded(key)
                removeDownstreamAndDeactivateIfNeeded(key)
            }
        }

        override suspend fun deactivateIfNeeded() {
            if (mutex.withLock { downstreamSet.isEmpty() }) {
            demuxNode.removeDownstreamAndDeactivateIfNeeded(key)
                removeDownstreamAndDeactivateIfNeeded(key)
            }
        }

@@ -249,35 +243,45 @@ internal class DemuxBranchNode<K, A>(val key: K, private val demuxNode: DemuxNod
            }
        }
    }
}

internal fun <K, A> DemuxImpl(
    upstream: suspend EvalScope.() -> TFlowImpl<Map<K, A>>,
    numKeys: Int?,
): DemuxImpl<K, A> =
    DemuxImpl(
        DemuxLifecycle(
            object : DemuxActivator<K, A> {
                override suspend fun activate(
    DemuxImpl(DemuxLifecycle(DemuxLifecycleState.Inactive(DemuxActivator(numKeys, upstream))))

internal class DemuxActivator<K, A>(
    private val numKeys: Int?,
    private val upstream: suspend EvalScope.() -> TFlowImpl<Map<K, A>>,
) {
    suspend fun activate(
        evalScope: EvalScope,
        lifecycle: DemuxLifecycle<K, A>,
                ): Pair<DemuxNode<K, A>, Boolean>? {
                    val dmux = DemuxNode(ConcurrentHashMap(numKeys ?: 16), lifecycle, this)
    ): Pair<DemuxNode<K, A>, Set<K>>? {
        val demux = DemuxNode(ConcurrentHashMap(numKeys ?: 16), lifecycle, this)
        return upstream
            .invoke(evalScope)
                        .activate(evalScope, downstream = dmux.schedulable)
            .activate(evalScope, downstream = demux.schedulable)
            ?.let { (conn, needsEval) ->
                            dmux.apply { upstreamConnection = conn } to needsEval
                Pair(
                    demux.apply { upstreamConnection = conn },
                    if (needsEval) {
                        demux.updateEpoch(evalScope)
                        conn.getPushEvent(evalScope).keys
                    } else {
                        emptySet()
                    },
                )
            }
    }
}
        )
    )

internal class DemuxImpl<in K, out A>(private val dmux: DemuxLifecycle<K, A>) {
    fun eventsForKey(key: K): TFlowImpl<A> = TFlowCheap { downstream ->
        dmux.activate(evalScope = this, key)?.let { (branchNode, needsEval) ->
            branchNode.addDownstream(downstream)
            val branchNeedsEval = needsEval && branchNode.getPushEvent(evalScope = this) is Just
            val branchNeedsEval = needsEval && branchNode.hasCurrentValue(evalScope = this)
            ActivationResult(
                connection = NodeConnection(branchNode, branchNode),
                needsEval = branchNeedsEval,
@@ -291,19 +295,13 @@ internal class DemuxLifecycle<K, A>(@Volatile var lifecycleState: DemuxLifecycle

    override fun toString(): String = "TFlowDmuxState[$hashString][$lifecycleState][$mutex]"

    suspend fun activate(evalScope: EvalScope, key: K): Pair<DemuxBranchNode<K, A>, Boolean>? =
        coroutineScope {
            mutex
                .withLock {
    suspend fun activate(evalScope: EvalScope, key: K): Pair<DemuxNode<K, A>.BranchNode, Boolean>? =
        mutex.withLock {
            when (val state = lifecycleState) {
                is DemuxLifecycleState.Dead -> null
                is DemuxLifecycleState.Active ->
                    state.node.getAndMaybeAddDownstream(key) to
                                async {
                                    state.node.upstreamConnection.hasCurrentValue(
                                        evalScope.transactionStore
                                    )
                                }
                        state.node.hasCurrentValueLocked(evalScope, key)
                is DemuxLifecycleState.Inactive -> {
                    state.spec
                        .activate(evalScope, this@DemuxLifecycle)
@@ -316,13 +314,10 @@ internal class DemuxLifecycle<K, A>(@Volatile var lifecycleState: DemuxLifecycle
                                }
                        }
                        ?.let { (node, needsEval) ->
                                    node.getAndMaybeAddDownstream(key) to
                                        CompletableDeferred(needsEval)
                                }
                            node.getAndMaybeAddDownstream(key) to (key in needsEval)
                        }
                }
            }
                ?.let { (branch, result) -> branch to result.await() }
        }
}

@@ -337,13 +332,3 @@ internal sealed interface DemuxLifecycleState<out K, out A> {

    data object Dead : DemuxLifecycleState<Nothing, Nothing>
}

internal interface DemuxActivator<K, A> {
    suspend fun activate(
        evalScope: EvalScope,
        lifecycle: DemuxLifecycle<K, A>,
    ): Pair<DemuxNode<K, A>, Boolean>?
}

internal inline fun <K, A> DemuxLifecycle(onSubscribe: DemuxActivator<K, A>) =
    DemuxLifecycle(DemuxLifecycleState.Inactive(onSubscribe))
+1 −1
Original line number Diff line number Diff line
@@ -57,7 +57,7 @@ internal class EvalScopeImpl(networkScope: NetworkScope, deferScope: DeferScope)
            TStateInit(
                    constInit(
                        "now",
                        mkState(
                        activatedTStateSource(
                            "now",
                            "now",
                            this,
+7 −10
Original line number Diff line number Diff line
@@ -21,14 +21,12 @@ import com.android.systemui.kairos.util.Maybe
import com.android.systemui.kairos.util.just
import com.android.systemui.kairos.util.none

internal inline fun <A, B> mapMaybeNode(
    crossinline getPulse: suspend EvalScope.() -> TFlowImpl<A>,
    crossinline f: suspend EvalScope.(A) -> Maybe<B>,
): TFlowImpl<B> {
    return DemuxImpl(
internal inline fun <A> filterJustImpl(
    crossinline getPulse: suspend EvalScope.() -> TFlowImpl<Maybe<A>>
): TFlowImpl<A> =
    DemuxImpl(
            {
                mapImpl(getPulse) {
                    val maybeResult = f(it)
                mapImpl(getPulse) { maybeResult ->
                    if (maybeResult is Just) {
                        mapOf(Unit to maybeResult.value)
                    } else {
@@ -39,9 +37,8 @@ internal inline fun <A, B> mapMaybeNode(
            numKeys = 1,
        )
        .eventsForKey(Unit)
}

internal inline fun <A> filterNode(
internal inline fun <A> filterImpl(
    crossinline getPulse: suspend EvalScope.() -> TFlowImpl<A>,
    crossinline f: suspend EvalScope.(A) -> Boolean,
): TFlowImpl<A> = mapMaybeNode(getPulse) { if (f(it)) just(it) else none }
): TFlowImpl<A> = filterJustImpl { mapImpl(getPulse) { if (f(it)) just(it) else none }.cached() }
Loading