Loading packages/SystemUI/utils/kairos/src/com/android/systemui/kairos/TFlow.kt +15 −13 Original line number Diff line number Diff line Loading @@ -26,11 +26,11 @@ import com.android.systemui.kairos.internal.TFlowImpl import com.android.systemui.kairos.internal.activated import com.android.systemui.kairos.internal.cached import com.android.systemui.kairos.internal.constInit import com.android.systemui.kairos.internal.filterNode import com.android.systemui.kairos.internal.filterImpl import com.android.systemui.kairos.internal.filterJustImpl import com.android.systemui.kairos.internal.init import com.android.systemui.kairos.internal.map import com.android.systemui.kairos.internal.mapImpl import com.android.systemui.kairos.internal.mapMaybeNode import com.android.systemui.kairos.internal.mergeNodes import com.android.systemui.kairos.internal.mergeNodesLeft import com.android.systemui.kairos.internal.neverImpl Loading Loading @@ -121,11 +121,8 @@ val <A> TState<A>.stateChanges: TFlow<A> * @see mapNotNull */ @ExperimentalFrpApi fun <A, B> TFlow<A>.mapMaybe(transform: suspend FrpTransactionScope.(A) -> Maybe<B>): TFlow<B> { val pulse = mapMaybeNode({ init.connect(evalScope = this) }) { runInTransactionScope { transform(it) } } return TFlowInit(constInit(name = null, pulse)) } fun <A, B> TFlow<A>.mapMaybe(transform: suspend FrpTransactionScope.(A) -> Maybe<B>): TFlow<B> = map(transform).filterJust() /** * Returns a [TFlow] that contains only the non-null results of applying [transform] to each value Loading @@ -140,14 +137,17 @@ fun <A, B> TFlow<A>.mapNotNull(transform: suspend FrpTransactionScope.(A) -> B?) } /** Returns a [TFlow] containing only values of the original [TFlow] that are not null. */ @ExperimentalFrpApi fun <A> TFlow<A?>.filterNotNull(): TFlow<A> = mapNotNull { it } @ExperimentalFrpApi fun <A> TFlow<A?>.filterNotNull(): TFlow<A> = mapCheap { it.toMaybe() }.filterJust() /** Shorthand for `mapNotNull { it as? A }`. */ @ExperimentalFrpApi inline fun <reified A> TFlow<*>.filterIsInstance(): TFlow<A> = mapNotNull { it as? A } inline fun <reified A> TFlow<*>.filterIsInstance(): TFlow<A> = mapCheap { it as? A }.filterNotNull() /** Shorthand for `mapMaybe { it }`. */ @ExperimentalFrpApi fun <A> TFlow<Maybe<A>>.filterJust(): TFlow<A> = mapMaybe { it } @ExperimentalFrpApi fun <A> TFlow<Maybe<A>>.filterJust(): TFlow<A> = TFlowInit(constInit(name = null, filterJustImpl { init.connect(evalScope = this) })) /** * Returns a [TFlow] containing the results of applying [transform] to each value of the original Loading Loading @@ -203,8 +203,8 @@ fun <A> TFlow<A>.onEach(action: suspend FrpTransactionScope.(A) -> Unit): TFlow< @ExperimentalFrpApi fun <A> TFlow<A>.filter(predicate: suspend FrpTransactionScope.(A) -> Boolean): TFlow<A> { val pulse = filterNode({ init.connect(evalScope = this) }) { runInTransactionScope { predicate(it) } } return TFlowInit(constInit(name = null, pulse.cached())) filterImpl({ init.connect(evalScope = this) }) { runInTransactionScope { predicate(it) } } return TFlowInit(constInit(name = null, pulse)) } /** Loading Loading @@ -455,7 +455,9 @@ fun <A> TState<TFlow<A>>.switchPromptly(): TFlow<A> { mapImpl({ patches }) { newFlow -> mapOf(Unit to just(newFlow.init.connect(this))) } }, ) return TFlowInit(constInit(name = null, mapImpl({ switchNode }) { it.getValue(Unit) })) return TFlowInit( constInit(name = null, mapImpl({ switchNode }) { it.getValue(Unit).getPushEvent(this) }) ) } /** Loading packages/SystemUI/utils/kairos/src/com/android/systemui/kairos/TState.kt +2 −2 Original line number Diff line number Diff line Loading @@ -29,7 +29,7 @@ import com.android.systemui.kairos.internal.activated import com.android.systemui.kairos.internal.cached import com.android.systemui.kairos.internal.constInit import com.android.systemui.kairos.internal.constS import com.android.systemui.kairos.internal.filterNode import com.android.systemui.kairos.internal.filterImpl import com.android.systemui.kairos.internal.flatMap import com.android.systemui.kairos.internal.init import com.android.systemui.kairos.internal.map Loading Loading @@ -469,7 +469,7 @@ internal constructor(internal val network: Network, initialValue: Deferred<T>) : val operatorName = "MutableTState" lateinit var state: TStateSource<T> val calm: TFlowImpl<T> = filterNode({ mapImpl(upstream = { changes.activated() }) { it!!.await() } }) { new -> filterImpl({ mapImpl(upstream = { changes.activated() }) { it!!.await() } }) { new -> new != state.getCurrentWithEpoch(evalScope = this).first } .cached() Loading packages/SystemUI/utils/kairos/src/com/android/systemui/kairos/internal/Demux.kt +104 −119 Original line number Diff line number Diff line Loading @@ -14,25 +14,17 @@ * limitations under the License. */ @file:Suppress("NOTHING_TO_INLINE") package com.android.systemui.kairos.internal import com.android.systemui.kairos.internal.util.hashString import com.android.systemui.kairos.util.Just import com.android.systemui.kairos.util.Maybe import com.android.systemui.kairos.util.flatMap import com.android.systemui.kairos.util.getMaybe import java.util.concurrent.ConcurrentHashMap import kotlinx.coroutines.CompletableDeferred import kotlinx.coroutines.async import kotlinx.coroutines.coroutineScope import kotlinx.coroutines.launch import kotlinx.coroutines.sync.Mutex import kotlinx.coroutines.sync.withLock internal class DemuxNode<K, A>( private val branchNodeByKey: ConcurrentHashMap<K, DemuxBranchNode<K, A>>, private val branchNodeByKey: ConcurrentHashMap<K, DemuxNode<K, A>.BranchNode>, val lifecycle: DemuxLifecycle<K, A>, private val spec: DemuxActivator<K, A>, ) : SchedulableNode { Loading @@ -44,25 +36,23 @@ internal class DemuxNode<K, A>( lateinit var upstreamConnection: NodeConnection<Map<K, A>> fun getAndMaybeAddDownstream(key: K): DemuxBranchNode<K, A> = branchNodeByKey.getOrPut(key) { DemuxBranchNode(key, this) } @Volatile private var epoch: Long = Long.MIN_VALUE suspend fun hasCurrentValueLocked(evalScope: EvalScope, key: K): Boolean = evalScope.epoch == epoch && upstreamConnection.getPushEvent(evalScope).contains(key) suspend fun hasCurrentValue(evalScope: EvalScope, key: K): Boolean = mutex.withLock { hasCurrentValueLocked(evalScope, key) } fun getAndMaybeAddDownstream(key: K): BranchNode = branchNodeByKey.getOrPut(key) { BranchNode(key) } override suspend fun schedule(evalScope: EvalScope) { override suspend fun schedule(evalScope: EvalScope) = coroutineScope { val upstreamResult = upstreamConnection.getPushEvent(evalScope) if (upstreamResult is Just) { coroutineScope { val outerScope = this mutex.withLock { coroutineScope { for ((key, _) in upstreamResult.value) { launch { branchNodeByKey[key]?.let { branch -> outerScope.launch { branch.schedule(evalScope) } } } } } } updateEpoch(evalScope) for ((key, _) in upstreamResult) { branchNodeByKey[key]?.let { branch -> launch { branch.schedule(evalScope) } } } } } Loading Loading @@ -194,23 +184,27 @@ internal class DemuxNode<K, A>( upstreamConnection.removeDownstreamAndDeactivateIfNeeded(downstream = schedulable) } } fun updateEpoch(evalScope: EvalScope) { epoch = evalScope.epoch } internal class DemuxBranchNode<K, A>(val key: K, private val demuxNode: DemuxNode<K, A>) : PushNode<A> { suspend fun getPushEvent(evalScope: EvalScope, key: K): A = upstreamConnection.getPushEvent(evalScope).getValue(key) inner class BranchNode(val key: K) : PushNode<A> { private val mutex = Mutex() val downstreamSet = DownstreamSet() override val depthTracker: DepthTracker get() = demuxNode.upstreamConnection.depthTracker get() = upstreamConnection.depthTracker override suspend fun hasCurrentValue(transactionStore: TransactionStore): Boolean = demuxNode.upstreamConnection.hasCurrentValue(transactionStore) override suspend fun hasCurrentValue(evalScope: EvalScope): Boolean = hasCurrentValue(evalScope, key) override suspend fun getPushEvent(evalScope: EvalScope): Maybe<A> = demuxNode.upstreamConnection.getPushEvent(evalScope).flatMap { it.getMaybe(key) } override suspend fun getPushEvent(evalScope: EvalScope): A = getPushEvent(evalScope, key) override suspend fun addDownstream(downstream: Schedulable) { mutex.withLock { downstreamSet.add(downstream) } Loading @@ -227,13 +221,13 @@ internal class DemuxBranchNode<K, A>(val key: K, private val demuxNode: DemuxNod downstreamSet.isEmpty() } if (canDeactivate) { demuxNode.removeDownstreamAndDeactivateIfNeeded(key) removeDownstreamAndDeactivateIfNeeded(key) } } override suspend fun deactivateIfNeeded() { if (mutex.withLock { downstreamSet.isEmpty() }) { demuxNode.removeDownstreamAndDeactivateIfNeeded(key) removeDownstreamAndDeactivateIfNeeded(key) } } Loading @@ -249,35 +243,45 @@ internal class DemuxBranchNode<K, A>(val key: K, private val demuxNode: DemuxNod } } } } internal fun <K, A> DemuxImpl( upstream: suspend EvalScope.() -> TFlowImpl<Map<K, A>>, numKeys: Int?, ): DemuxImpl<K, A> = DemuxImpl( DemuxLifecycle( object : DemuxActivator<K, A> { override suspend fun activate( DemuxImpl(DemuxLifecycle(DemuxLifecycleState.Inactive(DemuxActivator(numKeys, upstream)))) internal class DemuxActivator<K, A>( private val numKeys: Int?, private val upstream: suspend EvalScope.() -> TFlowImpl<Map<K, A>>, ) { suspend fun activate( evalScope: EvalScope, lifecycle: DemuxLifecycle<K, A>, ): Pair<DemuxNode<K, A>, Boolean>? { val dmux = DemuxNode(ConcurrentHashMap(numKeys ?: 16), lifecycle, this) ): Pair<DemuxNode<K, A>, Set<K>>? { val demux = DemuxNode(ConcurrentHashMap(numKeys ?: 16), lifecycle, this) return upstream .invoke(evalScope) .activate(evalScope, downstream = dmux.schedulable) .activate(evalScope, downstream = demux.schedulable) ?.let { (conn, needsEval) -> dmux.apply { upstreamConnection = conn } to needsEval Pair( demux.apply { upstreamConnection = conn }, if (needsEval) { demux.updateEpoch(evalScope) conn.getPushEvent(evalScope).keys } else { emptySet() }, ) } } } ) ) internal class DemuxImpl<in K, out A>(private val dmux: DemuxLifecycle<K, A>) { fun eventsForKey(key: K): TFlowImpl<A> = TFlowCheap { downstream -> dmux.activate(evalScope = this, key)?.let { (branchNode, needsEval) -> branchNode.addDownstream(downstream) val branchNeedsEval = needsEval && branchNode.getPushEvent(evalScope = this) is Just val branchNeedsEval = needsEval && branchNode.hasCurrentValue(evalScope = this) ActivationResult( connection = NodeConnection(branchNode, branchNode), needsEval = branchNeedsEval, Loading @@ -291,19 +295,13 @@ internal class DemuxLifecycle<K, A>(@Volatile var lifecycleState: DemuxLifecycle override fun toString(): String = "TFlowDmuxState[$hashString][$lifecycleState][$mutex]" suspend fun activate(evalScope: EvalScope, key: K): Pair<DemuxBranchNode<K, A>, Boolean>? = coroutineScope { mutex .withLock { suspend fun activate(evalScope: EvalScope, key: K): Pair<DemuxNode<K, A>.BranchNode, Boolean>? = mutex.withLock { when (val state = lifecycleState) { is DemuxLifecycleState.Dead -> null is DemuxLifecycleState.Active -> state.node.getAndMaybeAddDownstream(key) to async { state.node.upstreamConnection.hasCurrentValue( evalScope.transactionStore ) } state.node.hasCurrentValueLocked(evalScope, key) is DemuxLifecycleState.Inactive -> { state.spec .activate(evalScope, this@DemuxLifecycle) Loading @@ -316,13 +314,10 @@ internal class DemuxLifecycle<K, A>(@Volatile var lifecycleState: DemuxLifecycle } } ?.let { (node, needsEval) -> node.getAndMaybeAddDownstream(key) to CompletableDeferred(needsEval) } node.getAndMaybeAddDownstream(key) to (key in needsEval) } } } ?.let { (branch, result) -> branch to result.await() } } } Loading @@ -337,13 +332,3 @@ internal sealed interface DemuxLifecycleState<out K, out A> { data object Dead : DemuxLifecycleState<Nothing, Nothing> } internal interface DemuxActivator<K, A> { suspend fun activate( evalScope: EvalScope, lifecycle: DemuxLifecycle<K, A>, ): Pair<DemuxNode<K, A>, Boolean>? } internal inline fun <K, A> DemuxLifecycle(onSubscribe: DemuxActivator<K, A>) = DemuxLifecycle(DemuxLifecycleState.Inactive(onSubscribe)) packages/SystemUI/utils/kairos/src/com/android/systemui/kairos/internal/EvalScopeImpl.kt +1 −1 Original line number Diff line number Diff line Loading @@ -57,7 +57,7 @@ internal class EvalScopeImpl(networkScope: NetworkScope, deferScope: DeferScope) TStateInit( constInit( "now", mkState( activatedTStateSource( "now", "now", this, Loading packages/SystemUI/utils/kairos/src/com/android/systemui/kairos/internal/FilterNode.kt +7 −10 Original line number Diff line number Diff line Loading @@ -21,14 +21,12 @@ import com.android.systemui.kairos.util.Maybe import com.android.systemui.kairos.util.just import com.android.systemui.kairos.util.none internal inline fun <A, B> mapMaybeNode( crossinline getPulse: suspend EvalScope.() -> TFlowImpl<A>, crossinline f: suspend EvalScope.(A) -> Maybe<B>, ): TFlowImpl<B> { return DemuxImpl( internal inline fun <A> filterJustImpl( crossinline getPulse: suspend EvalScope.() -> TFlowImpl<Maybe<A>> ): TFlowImpl<A> = DemuxImpl( { mapImpl(getPulse) { val maybeResult = f(it) mapImpl(getPulse) { maybeResult -> if (maybeResult is Just) { mapOf(Unit to maybeResult.value) } else { Loading @@ -39,9 +37,8 @@ internal inline fun <A, B> mapMaybeNode( numKeys = 1, ) .eventsForKey(Unit) } internal inline fun <A> filterNode( internal inline fun <A> filterImpl( crossinline getPulse: suspend EvalScope.() -> TFlowImpl<A>, crossinline f: suspend EvalScope.(A) -> Boolean, ): TFlowImpl<A> = mapMaybeNode(getPulse) { if (f(it)) just(it) else none } ): TFlowImpl<A> = filterJustImpl { mapImpl(getPulse) { if (f(it)) just(it) else none }.cached() } Loading
packages/SystemUI/utils/kairos/src/com/android/systemui/kairos/TFlow.kt +15 −13 Original line number Diff line number Diff line Loading @@ -26,11 +26,11 @@ import com.android.systemui.kairos.internal.TFlowImpl import com.android.systemui.kairos.internal.activated import com.android.systemui.kairos.internal.cached import com.android.systemui.kairos.internal.constInit import com.android.systemui.kairos.internal.filterNode import com.android.systemui.kairos.internal.filterImpl import com.android.systemui.kairos.internal.filterJustImpl import com.android.systemui.kairos.internal.init import com.android.systemui.kairos.internal.map import com.android.systemui.kairos.internal.mapImpl import com.android.systemui.kairos.internal.mapMaybeNode import com.android.systemui.kairos.internal.mergeNodes import com.android.systemui.kairos.internal.mergeNodesLeft import com.android.systemui.kairos.internal.neverImpl Loading Loading @@ -121,11 +121,8 @@ val <A> TState<A>.stateChanges: TFlow<A> * @see mapNotNull */ @ExperimentalFrpApi fun <A, B> TFlow<A>.mapMaybe(transform: suspend FrpTransactionScope.(A) -> Maybe<B>): TFlow<B> { val pulse = mapMaybeNode({ init.connect(evalScope = this) }) { runInTransactionScope { transform(it) } } return TFlowInit(constInit(name = null, pulse)) } fun <A, B> TFlow<A>.mapMaybe(transform: suspend FrpTransactionScope.(A) -> Maybe<B>): TFlow<B> = map(transform).filterJust() /** * Returns a [TFlow] that contains only the non-null results of applying [transform] to each value Loading @@ -140,14 +137,17 @@ fun <A, B> TFlow<A>.mapNotNull(transform: suspend FrpTransactionScope.(A) -> B?) } /** Returns a [TFlow] containing only values of the original [TFlow] that are not null. */ @ExperimentalFrpApi fun <A> TFlow<A?>.filterNotNull(): TFlow<A> = mapNotNull { it } @ExperimentalFrpApi fun <A> TFlow<A?>.filterNotNull(): TFlow<A> = mapCheap { it.toMaybe() }.filterJust() /** Shorthand for `mapNotNull { it as? A }`. */ @ExperimentalFrpApi inline fun <reified A> TFlow<*>.filterIsInstance(): TFlow<A> = mapNotNull { it as? A } inline fun <reified A> TFlow<*>.filterIsInstance(): TFlow<A> = mapCheap { it as? A }.filterNotNull() /** Shorthand for `mapMaybe { it }`. */ @ExperimentalFrpApi fun <A> TFlow<Maybe<A>>.filterJust(): TFlow<A> = mapMaybe { it } @ExperimentalFrpApi fun <A> TFlow<Maybe<A>>.filterJust(): TFlow<A> = TFlowInit(constInit(name = null, filterJustImpl { init.connect(evalScope = this) })) /** * Returns a [TFlow] containing the results of applying [transform] to each value of the original Loading Loading @@ -203,8 +203,8 @@ fun <A> TFlow<A>.onEach(action: suspend FrpTransactionScope.(A) -> Unit): TFlow< @ExperimentalFrpApi fun <A> TFlow<A>.filter(predicate: suspend FrpTransactionScope.(A) -> Boolean): TFlow<A> { val pulse = filterNode({ init.connect(evalScope = this) }) { runInTransactionScope { predicate(it) } } return TFlowInit(constInit(name = null, pulse.cached())) filterImpl({ init.connect(evalScope = this) }) { runInTransactionScope { predicate(it) } } return TFlowInit(constInit(name = null, pulse)) } /** Loading Loading @@ -455,7 +455,9 @@ fun <A> TState<TFlow<A>>.switchPromptly(): TFlow<A> { mapImpl({ patches }) { newFlow -> mapOf(Unit to just(newFlow.init.connect(this))) } }, ) return TFlowInit(constInit(name = null, mapImpl({ switchNode }) { it.getValue(Unit) })) return TFlowInit( constInit(name = null, mapImpl({ switchNode }) { it.getValue(Unit).getPushEvent(this) }) ) } /** Loading
packages/SystemUI/utils/kairos/src/com/android/systemui/kairos/TState.kt +2 −2 Original line number Diff line number Diff line Loading @@ -29,7 +29,7 @@ import com.android.systemui.kairos.internal.activated import com.android.systemui.kairos.internal.cached import com.android.systemui.kairos.internal.constInit import com.android.systemui.kairos.internal.constS import com.android.systemui.kairos.internal.filterNode import com.android.systemui.kairos.internal.filterImpl import com.android.systemui.kairos.internal.flatMap import com.android.systemui.kairos.internal.init import com.android.systemui.kairos.internal.map Loading Loading @@ -469,7 +469,7 @@ internal constructor(internal val network: Network, initialValue: Deferred<T>) : val operatorName = "MutableTState" lateinit var state: TStateSource<T> val calm: TFlowImpl<T> = filterNode({ mapImpl(upstream = { changes.activated() }) { it!!.await() } }) { new -> filterImpl({ mapImpl(upstream = { changes.activated() }) { it!!.await() } }) { new -> new != state.getCurrentWithEpoch(evalScope = this).first } .cached() Loading
packages/SystemUI/utils/kairos/src/com/android/systemui/kairos/internal/Demux.kt +104 −119 Original line number Diff line number Diff line Loading @@ -14,25 +14,17 @@ * limitations under the License. */ @file:Suppress("NOTHING_TO_INLINE") package com.android.systemui.kairos.internal import com.android.systemui.kairos.internal.util.hashString import com.android.systemui.kairos.util.Just import com.android.systemui.kairos.util.Maybe import com.android.systemui.kairos.util.flatMap import com.android.systemui.kairos.util.getMaybe import java.util.concurrent.ConcurrentHashMap import kotlinx.coroutines.CompletableDeferred import kotlinx.coroutines.async import kotlinx.coroutines.coroutineScope import kotlinx.coroutines.launch import kotlinx.coroutines.sync.Mutex import kotlinx.coroutines.sync.withLock internal class DemuxNode<K, A>( private val branchNodeByKey: ConcurrentHashMap<K, DemuxBranchNode<K, A>>, private val branchNodeByKey: ConcurrentHashMap<K, DemuxNode<K, A>.BranchNode>, val lifecycle: DemuxLifecycle<K, A>, private val spec: DemuxActivator<K, A>, ) : SchedulableNode { Loading @@ -44,25 +36,23 @@ internal class DemuxNode<K, A>( lateinit var upstreamConnection: NodeConnection<Map<K, A>> fun getAndMaybeAddDownstream(key: K): DemuxBranchNode<K, A> = branchNodeByKey.getOrPut(key) { DemuxBranchNode(key, this) } @Volatile private var epoch: Long = Long.MIN_VALUE suspend fun hasCurrentValueLocked(evalScope: EvalScope, key: K): Boolean = evalScope.epoch == epoch && upstreamConnection.getPushEvent(evalScope).contains(key) suspend fun hasCurrentValue(evalScope: EvalScope, key: K): Boolean = mutex.withLock { hasCurrentValueLocked(evalScope, key) } fun getAndMaybeAddDownstream(key: K): BranchNode = branchNodeByKey.getOrPut(key) { BranchNode(key) } override suspend fun schedule(evalScope: EvalScope) { override suspend fun schedule(evalScope: EvalScope) = coroutineScope { val upstreamResult = upstreamConnection.getPushEvent(evalScope) if (upstreamResult is Just) { coroutineScope { val outerScope = this mutex.withLock { coroutineScope { for ((key, _) in upstreamResult.value) { launch { branchNodeByKey[key]?.let { branch -> outerScope.launch { branch.schedule(evalScope) } } } } } } updateEpoch(evalScope) for ((key, _) in upstreamResult) { branchNodeByKey[key]?.let { branch -> launch { branch.schedule(evalScope) } } } } } Loading Loading @@ -194,23 +184,27 @@ internal class DemuxNode<K, A>( upstreamConnection.removeDownstreamAndDeactivateIfNeeded(downstream = schedulable) } } fun updateEpoch(evalScope: EvalScope) { epoch = evalScope.epoch } internal class DemuxBranchNode<K, A>(val key: K, private val demuxNode: DemuxNode<K, A>) : PushNode<A> { suspend fun getPushEvent(evalScope: EvalScope, key: K): A = upstreamConnection.getPushEvent(evalScope).getValue(key) inner class BranchNode(val key: K) : PushNode<A> { private val mutex = Mutex() val downstreamSet = DownstreamSet() override val depthTracker: DepthTracker get() = demuxNode.upstreamConnection.depthTracker get() = upstreamConnection.depthTracker override suspend fun hasCurrentValue(transactionStore: TransactionStore): Boolean = demuxNode.upstreamConnection.hasCurrentValue(transactionStore) override suspend fun hasCurrentValue(evalScope: EvalScope): Boolean = hasCurrentValue(evalScope, key) override suspend fun getPushEvent(evalScope: EvalScope): Maybe<A> = demuxNode.upstreamConnection.getPushEvent(evalScope).flatMap { it.getMaybe(key) } override suspend fun getPushEvent(evalScope: EvalScope): A = getPushEvent(evalScope, key) override suspend fun addDownstream(downstream: Schedulable) { mutex.withLock { downstreamSet.add(downstream) } Loading @@ -227,13 +221,13 @@ internal class DemuxBranchNode<K, A>(val key: K, private val demuxNode: DemuxNod downstreamSet.isEmpty() } if (canDeactivate) { demuxNode.removeDownstreamAndDeactivateIfNeeded(key) removeDownstreamAndDeactivateIfNeeded(key) } } override suspend fun deactivateIfNeeded() { if (mutex.withLock { downstreamSet.isEmpty() }) { demuxNode.removeDownstreamAndDeactivateIfNeeded(key) removeDownstreamAndDeactivateIfNeeded(key) } } Loading @@ -249,35 +243,45 @@ internal class DemuxBranchNode<K, A>(val key: K, private val demuxNode: DemuxNod } } } } internal fun <K, A> DemuxImpl( upstream: suspend EvalScope.() -> TFlowImpl<Map<K, A>>, numKeys: Int?, ): DemuxImpl<K, A> = DemuxImpl( DemuxLifecycle( object : DemuxActivator<K, A> { override suspend fun activate( DemuxImpl(DemuxLifecycle(DemuxLifecycleState.Inactive(DemuxActivator(numKeys, upstream)))) internal class DemuxActivator<K, A>( private val numKeys: Int?, private val upstream: suspend EvalScope.() -> TFlowImpl<Map<K, A>>, ) { suspend fun activate( evalScope: EvalScope, lifecycle: DemuxLifecycle<K, A>, ): Pair<DemuxNode<K, A>, Boolean>? { val dmux = DemuxNode(ConcurrentHashMap(numKeys ?: 16), lifecycle, this) ): Pair<DemuxNode<K, A>, Set<K>>? { val demux = DemuxNode(ConcurrentHashMap(numKeys ?: 16), lifecycle, this) return upstream .invoke(evalScope) .activate(evalScope, downstream = dmux.schedulable) .activate(evalScope, downstream = demux.schedulable) ?.let { (conn, needsEval) -> dmux.apply { upstreamConnection = conn } to needsEval Pair( demux.apply { upstreamConnection = conn }, if (needsEval) { demux.updateEpoch(evalScope) conn.getPushEvent(evalScope).keys } else { emptySet() }, ) } } } ) ) internal class DemuxImpl<in K, out A>(private val dmux: DemuxLifecycle<K, A>) { fun eventsForKey(key: K): TFlowImpl<A> = TFlowCheap { downstream -> dmux.activate(evalScope = this, key)?.let { (branchNode, needsEval) -> branchNode.addDownstream(downstream) val branchNeedsEval = needsEval && branchNode.getPushEvent(evalScope = this) is Just val branchNeedsEval = needsEval && branchNode.hasCurrentValue(evalScope = this) ActivationResult( connection = NodeConnection(branchNode, branchNode), needsEval = branchNeedsEval, Loading @@ -291,19 +295,13 @@ internal class DemuxLifecycle<K, A>(@Volatile var lifecycleState: DemuxLifecycle override fun toString(): String = "TFlowDmuxState[$hashString][$lifecycleState][$mutex]" suspend fun activate(evalScope: EvalScope, key: K): Pair<DemuxBranchNode<K, A>, Boolean>? = coroutineScope { mutex .withLock { suspend fun activate(evalScope: EvalScope, key: K): Pair<DemuxNode<K, A>.BranchNode, Boolean>? = mutex.withLock { when (val state = lifecycleState) { is DemuxLifecycleState.Dead -> null is DemuxLifecycleState.Active -> state.node.getAndMaybeAddDownstream(key) to async { state.node.upstreamConnection.hasCurrentValue( evalScope.transactionStore ) } state.node.hasCurrentValueLocked(evalScope, key) is DemuxLifecycleState.Inactive -> { state.spec .activate(evalScope, this@DemuxLifecycle) Loading @@ -316,13 +314,10 @@ internal class DemuxLifecycle<K, A>(@Volatile var lifecycleState: DemuxLifecycle } } ?.let { (node, needsEval) -> node.getAndMaybeAddDownstream(key) to CompletableDeferred(needsEval) } node.getAndMaybeAddDownstream(key) to (key in needsEval) } } } ?.let { (branch, result) -> branch to result.await() } } } Loading @@ -337,13 +332,3 @@ internal sealed interface DemuxLifecycleState<out K, out A> { data object Dead : DemuxLifecycleState<Nothing, Nothing> } internal interface DemuxActivator<K, A> { suspend fun activate( evalScope: EvalScope, lifecycle: DemuxLifecycle<K, A>, ): Pair<DemuxNode<K, A>, Boolean>? } internal inline fun <K, A> DemuxLifecycle(onSubscribe: DemuxActivator<K, A>) = DemuxLifecycle(DemuxLifecycleState.Inactive(onSubscribe))
packages/SystemUI/utils/kairos/src/com/android/systemui/kairos/internal/EvalScopeImpl.kt +1 −1 Original line number Diff line number Diff line Loading @@ -57,7 +57,7 @@ internal class EvalScopeImpl(networkScope: NetworkScope, deferScope: DeferScope) TStateInit( constInit( "now", mkState( activatedTStateSource( "now", "now", this, Loading
packages/SystemUI/utils/kairos/src/com/android/systemui/kairos/internal/FilterNode.kt +7 −10 Original line number Diff line number Diff line Loading @@ -21,14 +21,12 @@ import com.android.systemui.kairos.util.Maybe import com.android.systemui.kairos.util.just import com.android.systemui.kairos.util.none internal inline fun <A, B> mapMaybeNode( crossinline getPulse: suspend EvalScope.() -> TFlowImpl<A>, crossinline f: suspend EvalScope.(A) -> Maybe<B>, ): TFlowImpl<B> { return DemuxImpl( internal inline fun <A> filterJustImpl( crossinline getPulse: suspend EvalScope.() -> TFlowImpl<Maybe<A>> ): TFlowImpl<A> = DemuxImpl( { mapImpl(getPulse) { val maybeResult = f(it) mapImpl(getPulse) { maybeResult -> if (maybeResult is Just) { mapOf(Unit to maybeResult.value) } else { Loading @@ -39,9 +37,8 @@ internal inline fun <A, B> mapMaybeNode( numKeys = 1, ) .eventsForKey(Unit) } internal inline fun <A> filterNode( internal inline fun <A> filterImpl( crossinline getPulse: suspend EvalScope.() -> TFlowImpl<A>, crossinline f: suspend EvalScope.(A) -> Boolean, ): TFlowImpl<A> = mapMaybeNode(getPulse) { if (f(it)) just(it) else none } ): TFlowImpl<A> = filterJustImpl { mapImpl(getPulse) { if (f(it)) just(it) else none }.cached() }