Loading packages/SystemUI/utils/kairos/src/com/android/systemui/kairos/internal/Graph.kt +1 −1 Original line number Diff line number Diff line Loading @@ -86,7 +86,7 @@ internal class DepthTracker { @Volatile private var dirty_depthIsDirect = true @Volatile private var dirty_isIndirectRoot = false suspend fun schedule(scheduler: Scheduler, node: MuxNode<*, *, *>) { fun schedule(scheduler: Scheduler, node: MuxNode<*, *, *>) { if (dirty_depthIsDirect) { scheduler.schedule(dirty_directDepth, node) } else { Loading packages/SystemUI/utils/kairos/src/com/android/systemui/kairos/internal/Network.kt +1 −3 Original line number Diff line number Diff line Loading @@ -95,9 +95,7 @@ internal class Network(val coroutineScope: CoroutineScope) : NetworkScope { } /** Listens for external events and starts FRP transactions. Runs forever. */ suspend fun runInputScheduler() = coroutineScope { launch { scheduler.activate() } launch { compactor.activate() } suspend fun runInputScheduler() { val actions = mutableListOf<ScheduledAction<*>>() for (first in inputScheduleChan) { // Drain and conflate all transaction requests into a single transaction Loading packages/SystemUI/utils/kairos/src/com/android/systemui/kairos/internal/Scheduler.kt +7 −24 Original line number Diff line number Diff line Loading @@ -21,44 +21,34 @@ package com.android.systemui.kairos.internal import java.util.concurrent.ConcurrentHashMap import java.util.concurrent.PriorityBlockingQueue import kotlinx.coroutines.ExperimentalCoroutinesApi import kotlinx.coroutines.channels.Channel import kotlinx.coroutines.coroutineScope import kotlinx.coroutines.launch internal interface Scheduler { suspend fun schedule(depth: Int, node: MuxNode<*, *, *>) fun schedule(depth: Int, node: MuxNode<*, *, *>) suspend fun scheduleIndirect(indirectDepth: Int, node: MuxNode<*, *, *>) fun scheduleIndirect(indirectDepth: Int, node: MuxNode<*, *, *>) } internal class SchedulerImpl : Scheduler { val enqueued = ConcurrentHashMap<MuxNode<*, *, *>, Any>() val scheduledQ = PriorityBlockingQueue<Pair<Int, MuxNode<*, *, *>>>(16, compareBy { it.first }) val chan = Channel<Pair<Int, MuxNode<*, *, *>>>(Channel.UNLIMITED) override suspend fun schedule(depth: Int, node: MuxNode<*, *, *>) { override fun schedule(depth: Int, node: MuxNode<*, *, *>) { if (enqueued.putIfAbsent(node, node) == null) { chan.send(Pair(depth, node)) scheduledQ.add(Pair(depth, node)) } } override suspend fun scheduleIndirect(indirectDepth: Int, node: MuxNode<*, *, *>) { override fun scheduleIndirect(indirectDepth: Int, node: MuxNode<*, *, *>) { schedule(Int.MIN_VALUE + indirectDepth, node) } suspend fun activate() { for (nodeSchedule in chan) { scheduledQ.add(nodeSchedule) drainChan() } } internal suspend fun drainEval(network: Network) { drain { runStep -> runStep { muxNode -> network.evalScope { muxNode.visit(this) } } // If any visited MuxPromptNodes had their depths increased, eagerly propagate those // depth // changes now before performing further network evaluation. // depth changes now before performing further network evaluation. network.compactor.drainCompact() } } Loading @@ -71,19 +61,12 @@ internal class SchedulerImpl : Scheduler { crossinline onStep: suspend (runStep: suspend (visit: suspend (MuxNode<*, *, *>) -> Unit) -> Unit) -> Unit ): Unit = coroutineScope { while (!chan.isEmpty || scheduledQ.isNotEmpty()) { drainChan() while (scheduledQ.isNotEmpty()) { val maxDepth = scheduledQ.peek()?.first ?: error("Unexpected empty scheduler") onStep { visit -> runStep(maxDepth, visit) } } } private suspend fun drainChan() { while (!chan.isEmpty) { scheduledQ.add(chan.receive()) } } private suspend inline fun runStep( maxDepth: Int, crossinline visit: suspend (MuxNode<*, *, *>) -> Unit, Loading Loading
packages/SystemUI/utils/kairos/src/com/android/systemui/kairos/internal/Graph.kt +1 −1 Original line number Diff line number Diff line Loading @@ -86,7 +86,7 @@ internal class DepthTracker { @Volatile private var dirty_depthIsDirect = true @Volatile private var dirty_isIndirectRoot = false suspend fun schedule(scheduler: Scheduler, node: MuxNode<*, *, *>) { fun schedule(scheduler: Scheduler, node: MuxNode<*, *, *>) { if (dirty_depthIsDirect) { scheduler.schedule(dirty_directDepth, node) } else { Loading
packages/SystemUI/utils/kairos/src/com/android/systemui/kairos/internal/Network.kt +1 −3 Original line number Diff line number Diff line Loading @@ -95,9 +95,7 @@ internal class Network(val coroutineScope: CoroutineScope) : NetworkScope { } /** Listens for external events and starts FRP transactions. Runs forever. */ suspend fun runInputScheduler() = coroutineScope { launch { scheduler.activate() } launch { compactor.activate() } suspend fun runInputScheduler() { val actions = mutableListOf<ScheduledAction<*>>() for (first in inputScheduleChan) { // Drain and conflate all transaction requests into a single transaction Loading
packages/SystemUI/utils/kairos/src/com/android/systemui/kairos/internal/Scheduler.kt +7 −24 Original line number Diff line number Diff line Loading @@ -21,44 +21,34 @@ package com.android.systemui.kairos.internal import java.util.concurrent.ConcurrentHashMap import java.util.concurrent.PriorityBlockingQueue import kotlinx.coroutines.ExperimentalCoroutinesApi import kotlinx.coroutines.channels.Channel import kotlinx.coroutines.coroutineScope import kotlinx.coroutines.launch internal interface Scheduler { suspend fun schedule(depth: Int, node: MuxNode<*, *, *>) fun schedule(depth: Int, node: MuxNode<*, *, *>) suspend fun scheduleIndirect(indirectDepth: Int, node: MuxNode<*, *, *>) fun scheduleIndirect(indirectDepth: Int, node: MuxNode<*, *, *>) } internal class SchedulerImpl : Scheduler { val enqueued = ConcurrentHashMap<MuxNode<*, *, *>, Any>() val scheduledQ = PriorityBlockingQueue<Pair<Int, MuxNode<*, *, *>>>(16, compareBy { it.first }) val chan = Channel<Pair<Int, MuxNode<*, *, *>>>(Channel.UNLIMITED) override suspend fun schedule(depth: Int, node: MuxNode<*, *, *>) { override fun schedule(depth: Int, node: MuxNode<*, *, *>) { if (enqueued.putIfAbsent(node, node) == null) { chan.send(Pair(depth, node)) scheduledQ.add(Pair(depth, node)) } } override suspend fun scheduleIndirect(indirectDepth: Int, node: MuxNode<*, *, *>) { override fun scheduleIndirect(indirectDepth: Int, node: MuxNode<*, *, *>) { schedule(Int.MIN_VALUE + indirectDepth, node) } suspend fun activate() { for (nodeSchedule in chan) { scheduledQ.add(nodeSchedule) drainChan() } } internal suspend fun drainEval(network: Network) { drain { runStep -> runStep { muxNode -> network.evalScope { muxNode.visit(this) } } // If any visited MuxPromptNodes had their depths increased, eagerly propagate those // depth // changes now before performing further network evaluation. // depth changes now before performing further network evaluation. network.compactor.drainCompact() } } Loading @@ -71,19 +61,12 @@ internal class SchedulerImpl : Scheduler { crossinline onStep: suspend (runStep: suspend (visit: suspend (MuxNode<*, *, *>) -> Unit) -> Unit) -> Unit ): Unit = coroutineScope { while (!chan.isEmpty || scheduledQ.isNotEmpty()) { drainChan() while (scheduledQ.isNotEmpty()) { val maxDepth = scheduledQ.peek()?.first ?: error("Unexpected empty scheduler") onStep { visit -> runStep(maxDepth, visit) } } } private suspend fun drainChan() { while (!chan.isEmpty) { scheduledQ.add(chan.receive()) } } private suspend inline fun runStep( maxDepth: Int, crossinline visit: suspend (MuxNode<*, *, *>) -> Unit, Loading