Donate to e Foundation | Murena handsets with /e/OS | Own a part of Murena! Learn more

Commit 7ae1f019 authored by Steve Elliott's avatar Steve Elliott
Browse files

[kairos] remove unncecessary scheduler channel

it's more efficient to insert directly into the queue

Flag: EXEMPT unused
Test: atest kairos-tests
Change-Id: Ifee897d7324e544c100b147762bc8bbaf30a252e
parent 00ad67ff
Loading
Loading
Loading
Loading
+1 −1
Original line number Diff line number Diff line
@@ -86,7 +86,7 @@ internal class DepthTracker {
    @Volatile private var dirty_depthIsDirect = true
    @Volatile private var dirty_isIndirectRoot = false

    suspend fun schedule(scheduler: Scheduler, node: MuxNode<*, *, *>) {
    fun schedule(scheduler: Scheduler, node: MuxNode<*, *, *>) {
        if (dirty_depthIsDirect) {
            scheduler.schedule(dirty_directDepth, node)
        } else {
+1 −3
Original line number Diff line number Diff line
@@ -95,9 +95,7 @@ internal class Network(val coroutineScope: CoroutineScope) : NetworkScope {
    }

    /** Listens for external events and starts FRP transactions. Runs forever. */
    suspend fun runInputScheduler() = coroutineScope {
        launch { scheduler.activate() }
        launch { compactor.activate() }
    suspend fun runInputScheduler() {
        val actions = mutableListOf<ScheduledAction<*>>()
        for (first in inputScheduleChan) {
            // Drain and conflate all transaction requests into a single transaction
+7 −24
Original line number Diff line number Diff line
@@ -21,44 +21,34 @@ package com.android.systemui.kairos.internal
import java.util.concurrent.ConcurrentHashMap
import java.util.concurrent.PriorityBlockingQueue
import kotlinx.coroutines.ExperimentalCoroutinesApi
import kotlinx.coroutines.channels.Channel
import kotlinx.coroutines.coroutineScope
import kotlinx.coroutines.launch

internal interface Scheduler {
    suspend fun schedule(depth: Int, node: MuxNode<*, *, *>)
    fun schedule(depth: Int, node: MuxNode<*, *, *>)

    suspend fun scheduleIndirect(indirectDepth: Int, node: MuxNode<*, *, *>)
    fun scheduleIndirect(indirectDepth: Int, node: MuxNode<*, *, *>)
}

internal class SchedulerImpl : Scheduler {
    val enqueued = ConcurrentHashMap<MuxNode<*, *, *>, Any>()
    val scheduledQ = PriorityBlockingQueue<Pair<Int, MuxNode<*, *, *>>>(16, compareBy { it.first })
    val chan = Channel<Pair<Int, MuxNode<*, *, *>>>(Channel.UNLIMITED)

    override suspend fun schedule(depth: Int, node: MuxNode<*, *, *>) {
    override fun schedule(depth: Int, node: MuxNode<*, *, *>) {
        if (enqueued.putIfAbsent(node, node) == null) {
            chan.send(Pair(depth, node))
            scheduledQ.add(Pair(depth, node))
        }
    }

    override suspend fun scheduleIndirect(indirectDepth: Int, node: MuxNode<*, *, *>) {
    override fun scheduleIndirect(indirectDepth: Int, node: MuxNode<*, *, *>) {
        schedule(Int.MIN_VALUE + indirectDepth, node)
    }

    suspend fun activate() {
        for (nodeSchedule in chan) {
            scheduledQ.add(nodeSchedule)
            drainChan()
        }
    }

    internal suspend fun drainEval(network: Network) {
        drain { runStep ->
            runStep { muxNode -> network.evalScope { muxNode.visit(this) } }
            // If any visited MuxPromptNodes had their depths increased, eagerly propagate those
            // depth
            // changes now before performing further network evaluation.
            // depth changes now before performing further network evaluation.
            network.compactor.drainCompact()
        }
    }
@@ -71,19 +61,12 @@ internal class SchedulerImpl : Scheduler {
        crossinline onStep:
            suspend (runStep: suspend (visit: suspend (MuxNode<*, *, *>) -> Unit) -> Unit) -> Unit
    ): Unit = coroutineScope {
        while (!chan.isEmpty || scheduledQ.isNotEmpty()) {
            drainChan()
        while (scheduledQ.isNotEmpty()) {
            val maxDepth = scheduledQ.peek()?.first ?: error("Unexpected empty scheduler")
            onStep { visit -> runStep(maxDepth, visit) }
        }
    }

    private suspend fun drainChan() {
        while (!chan.isEmpty) {
            scheduledQ.add(chan.receive())
        }
    }

    private suspend inline fun runStep(
        maxDepth: Int,
        crossinline visit: suspend (MuxNode<*, *, *>) -> Unit,