Donate to e Foundation | Murena handsets with /e/OS | Own a part of Murena! Learn more

Commit 1cd4c618 authored by Steve Elliott's avatar Steve Elliott
Browse files

[kairos] introduce sync observation APIs

Flag: com.android.systemui.status_bar_mobile_icon_kairos
Bug: 383172066
Test: atest
Change-Id: Id948be9923c74ebf388e6e5824b7f99ba060dbd0
parent eb361acd
Loading
Loading
Loading
Loading
+3 −3
Original line number Diff line number Diff line
@@ -20,7 +20,7 @@ import com.android.systemui.kairos.BuildScope
import com.android.systemui.kairos.ExperimentalKairosApi
import com.android.systemui.kairos.State
import com.android.systemui.kairos.changes
import com.android.systemui.kairos.effect
import com.android.systemui.kairos.effectSync
import com.android.systemui.util.kotlin.pairwiseBy
import kotlinx.coroutines.flow.Flow

@@ -221,13 +221,13 @@ fun <T : Diffable<T>> BuildScope.logDiffsForTable(
    columnPrefix: String = "",
) {
    val initialValue = diffableState.sampleDeferred()
    effect {
    effectSync {
        // Fully log the initial value to the table.
        tableLogBuffer.logChange(columnPrefix, isInitial = true) { row ->
            initialValue.value.logFull(row)
        }
    }
    diffableState.changes.observe { newState ->
    diffableState.changes.observeSync { newState ->
        val prevState = diffableState.sample()
        tableLogBuffer.logDiffs(columnPrefix, prevVal = prevState, newVal = newState)
    }
+39 −32
Original line number Diff line number Diff line
@@ -136,8 +136,9 @@ object MobileIconBinderKairos {
        val roamingSpace = view.requireViewById<Space>(R.id.mobile_roaming_space)
        val dotView = view.requireViewById<StatusBarIconView>(R.id.status_bar_dot)

        val isVisible = viewModel.isVisible.sample()
        effect {
            view.isVisible = viewModel.isVisible.sample()
            view.isVisible = isVisible
            iconView.isVisible = true
            launch {
                view.repeatWhenAttachedToWindow {
@@ -182,6 +183,7 @@ object MobileIconBinderKairos {
                                    oldIcon is SignalIconModel.Cellular &&
                                        newIcon is SignalIconModel.Cellular ->
                                        oldIcon.numberOfLevels != newIcon.numberOfLevels

                                    else -> false
                                }
                            viewModel.verboseLogger?.logBinderReceivedSignalIcon(
@@ -222,18 +224,21 @@ object MobileIconBinderKairos {
                        }

                        // Set the network type background
                        viewModel.networkTypeBackground.observe { background ->

                        viewModel.networkTypeIcon
                            .mapTransactionally { it to binding.iconTint.sample() }
                            .observe { (background, iconTintColors) ->
                                networkTypeContainer.setBackgroundResource(background?.res ?: 0)

                                // Tint will invert when this bit changes
                                if (background?.res != null) {
                                    networkTypeContainer.backgroundTintList =
                                    ColorStateList.valueOf(binding.iconTint.sample().tint)
                                        ColorStateList.valueOf(iconTintColors.tint)
                                    networkTypeView.imageTintList =
                                    ColorStateList.valueOf(binding.iconTint.sample().contrast)
                                        ColorStateList.valueOf(iconTintColors.contrast)
                                } else {
                                    networkTypeView.imageTintList =
                                    ColorStateList.valueOf(binding.iconTint.sample().tint)
                                        ColorStateList.valueOf(iconTintColors.tint)
                                }
                            }

@@ -266,14 +271,16 @@ object MobileIconBinderKairos {
                        }

                        // Set the tint
                        binding.iconTint.observe { colors ->
                        binding.iconTint
                            .mapTransactionally { it to viewModel.networkTypeBackground.sample() }
                            .observe { (colors, networkTypeBackground) ->
                                val tint = ColorStateList.valueOf(colors.tint)
                                val contrast = ColorStateList.valueOf(colors.contrast)

                                iconView.imageTintList = tint

                                // If the bg is visible, tint it and use the contrast for the fg
                            if (viewModel.networkTypeBackground.sample() != null) {
                                if (networkTypeBackground != null) {
                                    networkTypeContainer.backgroundTintList = tint
                                    networkTypeView.imageTintList = contrast
                                } else {
+99 −22
Original line number Diff line number Diff line
@@ -23,6 +23,7 @@ import kotlin.coroutines.EmptyCoroutineContext
import kotlinx.coroutines.CompletableDeferred
import kotlinx.coroutines.Deferred
import kotlinx.coroutines.DisposableHandle
import kotlinx.coroutines.ExperimentalForInheritanceCoroutinesApi
import kotlinx.coroutines.Job
import kotlinx.coroutines.awaitCancellation
import kotlinx.coroutines.flow.Flow
@@ -113,9 +114,29 @@ interface BuildScope : HasNetwork, StateScope {
    // TODO: remove disposable handle return? might add more confusion than convenience
    fun <A> Events<A>.observe(
        coroutineContext: CoroutineContext = EmptyCoroutineContext,
        block: EffectScope.(A) -> Unit = {},
        block: suspend EffectScope.(A) -> Unit,
    ): DisposableHandle

    /**
     * Invokes [block] whenever this [Events] emits a value, allowing side-effects to be safely
     * performed in reaction to the emission.
     *
     * Specifically, [block] is deferred to the end of the transaction, and is only actually
     * executed if this [BuildScope] is still active by that time. It can be deactivated due to a
     * -Latest combinator, for example.
     *
     * Note that unlike with [observe], [block] will be invoked *synchronously* within the current
     * transaction. This means that it will run on whatever thread the current transaction is
     * running on (determined by the [dispatcher][kotlinx.coroutines.CoroutineDispatcher] used to
     * stand up the [KairosNetwork]), and will ignore whatever dispatcher is specified for this
     * [BuildScope]. This avoids the overhead associated with the dispatcher.
     *
     * Generally, you should prefer [observe] over this method.
     *
     * @see observe
     */
    fun <A> Events<A>.observeSync(block: TransactionEffectScope.(A) -> Unit = {}): DisposableHandle

    /**
     * Returns an [Events] containing the results of applying each [BuildSpec] emitted from the
     * original [Events], and a [DeferredValue] containing the result of applying [initialSpecs]
@@ -169,7 +190,7 @@ interface BuildScope : HasNetwork, StateScope {
     */
    // TODO: see TODO for [events]
    fun <In, Out> coalescingEvents(
        getInitialValue: () -> Out,
        getInitialValue: KairosScope.() -> Out,
        coalesce: (old: Out, new: In) -> Out,
        builder: suspend CoalescingEventProducerScope<In>.() -> Unit,
    ): Events<Out>
@@ -214,16 +235,17 @@ interface BuildScope : HasNetwork, StateScope {
     *
     * @see observe
     */
    fun <A> Events<A>.observeBuild(block: BuildScope.(A) -> Unit = {}): DisposableHandle =
        mapBuild(block).observe()
    fun <A> Events<A>.observeBuild(block: BuildScope.(A) -> Unit): DisposableHandle =
        mapBuild(block).observeSync()

    /**
     * Returns a [StateFlow] whose [value][StateFlow.value] tracks the current
     * [value of this State][State.sample], and will emit at the same rate as [State.changes].
     */
    @OptIn(ExperimentalForInheritanceCoroutinesApi::class)
    fun <A> State<A>.toStateFlow(): StateFlow<A> {
        val innerStateFlow = MutableStateFlow(sampleDeferred())
        changes.observe { innerStateFlow.value = deferredOf(it) }
        changes.observeSync { innerStateFlow.value = deferredOf(it) }
        return object : StateFlow<A> {
            override val replayCache: List<A>
                get() = innerStateFlow.replayCache.map { it.value }
@@ -245,7 +267,7 @@ interface BuildScope : HasNetwork, StateScope {
        val result = MutableSharedFlow<A>(replay, extraBufferCapacity = 1)
        deferredBuildScope {
            result.tryEmit(sample())
            changes.observe { a -> result.tryEmit(a) }
            changes.observeSync { a -> result.tryEmit(a) }
        }
        return result
    }
@@ -256,7 +278,7 @@ interface BuildScope : HasNetwork, StateScope {
     */
    fun <A> Events<A>.toSharedFlow(replay: Int = 0): SharedFlow<A> {
        val result = MutableSharedFlow<A>(replay, extraBufferCapacity = 1)
        observe { a -> result.tryEmit(a) }
        observeSync { a -> result.tryEmit(a) }
        return result
    }

@@ -558,7 +580,7 @@ interface BuildScope : HasNetwork, StateScope {
    /** Returns a [Deferred] containing the next value to be emitted from this [Events]. */
    fun <R> Events<R>.nextDeferred(): Deferred<R> {
        lateinit var next: CompletableDeferred<R>
        val job = launchScope { nextOnly().observe { next.complete(it) } }
        val job = launchScope { nextOnly().observeSync { next.complete(it) } }
        next = CompletableDeferred(parent = job)
        return next
    }
@@ -607,8 +629,8 @@ interface BuildScope : HasNetwork, StateScope {
     * registered [observers][observe] are unregistered, and any pending [side-effects][effect] are
     * cancelled).
     */
    fun <A> Events<A>.observeLatestBuild(block: BuildScope.(A) -> Unit = {}): DisposableHandle =
        mapLatestBuild { block(it) }.observe()
    fun <A> Events<A>.observeLatestBuild(block: BuildScope.(A) -> Unit): DisposableHandle =
        mapLatestBuild { block(it) }.observeSync()

    /**
     * Invokes [block] whenever this [Events] emits a value, allowing side-effects to be safely
@@ -616,7 +638,7 @@ interface BuildScope : HasNetwork, StateScope {
     *
     * With each invocation of [block], running effects from the previous invocation are cancelled.
     */
    fun <A> Events<A>.observeLatest(block: EffectScope.(A) -> Unit = {}): DisposableHandle {
    fun <A> Events<A>.observeLatest(block: suspend EffectScope.(A) -> Unit): DisposableHandle {
        var innerJob: Job? = null
        return observeBuild {
            innerJob?.cancel()
@@ -630,11 +652,11 @@ interface BuildScope : HasNetwork, StateScope {
     *
     * With each invocation of [block], running effects from the previous invocation are cancelled.
     */
    fun <A> State<A>.observeLatest(block: EffectScope.(A) -> Unit = {}): Job = launchScope {
        var innerJob = effect { block(sample()) }
    fun <A> State<A>.observeLatest(block: TransactionEffectScope.(A) -> Unit): Job = launchScope {
        var innerJob = effectSync { block(sample()) }
        changes.observeBuild {
            innerJob.cancel()
            innerJob = effect { block(it) }
            innerJob = effectSync { block(it) }
        }
    }

@@ -647,7 +669,7 @@ interface BuildScope : HasNetwork, StateScope {
     * each invocation of [block], changes from the previous invocation are undone (any registered
     * [observers][observe] are unregistered, and any pending [side-effects][effect] are cancelled).
     */
    fun <A> State<A>.observeLatestBuild(block: BuildScope.(A) -> Unit = {}): Job = launchScope {
    fun <A> State<A>.observeLatestBuild(block: BuildScope.(A) -> Unit): Job = launchScope {
        var innerJob: Job = launchScope { block(sample()) }
        changes.observeBuild {
            innerJob.cancel()
@@ -669,20 +691,20 @@ interface BuildScope : HasNetwork, StateScope {
     * [effect].
     *
     * ```
     *     fun <A> State<A>.observeBuild(block: BuildScope.(A) -> Unit = {}): Job = launchScope {
     *     fun <A> State<A>.observeBuild(block: BuildScope.(A) -> Unit): Job = launchScope {
     *         block(sample())
     *         changes.observeBuild(block)
     *     }
     * ```
     */
    fun <A> State<A>.observeBuild(block: BuildScope.(A) -> Unit = {}): Job = launchScope {
    fun <A> State<A>.observeBuild(block: BuildScope.(A) -> Unit): Job = launchScope {
        block(sample())
        changes.observeBuild(block)
    }

    /**
     * Invokes [block] with the current value of this [State], re-invoking whenever it changes,
     * allowing side-effects to be safely performed in reaction value changing.
     * allowing side-effects to be safely performed in reaction to the value changing.
     *
     * Specifically, [block] is deferred to the end of the transaction, and is only actually
     * executed if this [BuildScope] is still active by that time. It can be deactivated due to a
@@ -694,9 +716,34 @@ interface BuildScope : HasNetwork, StateScope {
     */
    fun <A> State<A>.observe(
        coroutineContext: CoroutineContext = EmptyCoroutineContext,
        block: EffectScope.(A) -> Unit = {},
        block: suspend EffectScope.(A) -> Unit,
    ): DisposableHandle =
        now.map { sample() }.mergeWith(changes) { _, new -> new }.observe(coroutineContext, block)

    /**
     * Invokes [block] with the current value of this [State], re-invoking whenever it changes,
     * allowing side-effects to be safely performed in reaction to the value changing.
     *
     * Specifically, [block] is deferred to the end of the transaction, and is only actually
     * executed if this [BuildScope] is still active by that time. It can be deactivated due to a
     * -Latest combinator, for example.
     *
     * If the [State] is changing within the *current* transaction (i.e. [changes] is presently
     * emitting) then [block] will be invoked for the first time with the new value; otherwise, it
     * will be invoked with the [current][sample] value.
     *
     * Note that unlike with [observe], [block] will be invoked *synchronously* within the current
     * transaction. This means that it will run on whatever thread the current transaction is
     * running on (determined by the [dispatcher][kotlinx.coroutines.CoroutineDispatcher] used to
     * stand up the [KairosNetwork]), and will ignore whatever dispatcher is specified for this
     * [BuildScope]. This avoids the overhead associated with the dispatcher.
     *
     * Generally, you should prefer [observe] over this method.
     *
     * @see observe
     */
    fun <A> State<A>.observeSync(block: TransactionEffectScope.(A) -> Unit = {}): DisposableHandle =
        now.map { sample() }.mergeWith(changes) { _, new -> new }.observeSync(block)
}

/**
@@ -710,13 +757,13 @@ interface BuildScope : HasNetwork, StateScope {
 * ```
 */
@ExperimentalKairosApi
fun <A> BuildScope.asyncEvent(block: suspend () -> A): Events<A> =
fun <A> BuildScope.asyncEvent(block: suspend KairosScope.() -> A): Events<A> =
    events {
            // TODO: if block completes synchronously, it would be nice to emit within this
            //  transaction
            emit(block())
        }
        .apply { observe() }
        .apply { observeSync() }

/**
 * Performs a side-effect in a safe manner w/r/t the current Kairos transaction.
@@ -736,9 +783,39 @@ fun <A> BuildScope.asyncEvent(block: suspend () -> A): Events<A> =
@ExperimentalKairosApi
fun BuildScope.effect(
    context: CoroutineContext = EmptyCoroutineContext,
    block: EffectScope.() -> Unit,
    block: suspend EffectScope.() -> Unit,
): Job = launchScope(context) { now.observe { block() } }

/**
 * Performs a side-effect in a safe manner w/r/t the current Kairos transaction.
 *
 * Specifically, [block] is deferred to the end of the current transaction, and is only actually
 * executed if this [BuildScope] is still active by that time. It can be deactivated due to a
 * -Latest combinator, for example.
 *
 * Note that unlike with [effect], [block] will be invoked *synchronously* within the current
 * transaction. This means that it will run on whatever thread the current transaction is running on
 * (determined by the [dispatcher][kotlinx.coroutines.CoroutineDispatcher] used to stand up the
 * [KairosNetwork]), and will ignore whatever dispatcher is specified for this [BuildScope]. This
 * avoids the overhead associated with the dispatcher.
 *
 * Generally, you should prefer [effect] over this method.
 *
 * ```
 *   fun BuildScope.effectImmediate(
 *       context: CoroutineContext = EmptyCoroutineContext,
 *       block: EffectScope.() -> Unit,
 *   ): Job =
 *       launchScope(context) { now.observeImmediate { block() } }
 * ```
 *
 * @see effect
 */
@ExperimentalKairosApi
fun BuildScope.effectSync(block: TransactionEffectScope.() -> Unit): Job = launchScope {
    now.observeSync { block() }
}

/**
 * Launches [block] in a new coroutine, returning a [Job] bound to the coroutine.
 *
+21 −5
Original line number Diff line number Diff line
@@ -26,13 +26,12 @@ import kotlinx.coroutines.Job
/**
 * Scope for external side-effects triggered by the Kairos network.
 *
 * This still occurs within the context of a transaction, so general suspending calls are disallowed
 * to prevent blocking the transaction. You can [launch] new coroutines to perform long-running
 * asynchronous work. These coroutines are kept alive for the duration of the containing
 * [BuildScope] that this side-effect scope is running in.
 * You can [launch] new coroutines to perform long-running asynchronous work. These coroutines are
 * kept alive for the duration of the containing [BuildScope] that this side-effect scope is running
 * in.
 */
@ExperimentalKairosApi
interface EffectScope : HasNetwork, TransactionScope {
interface EffectScope : HasNetwork {
    /**
     * Creates a coroutine that is a child of this [EffectScope], and returns its future result as a
     * [Deferred].
@@ -58,4 +57,21 @@ interface EffectScope : HasNetwork, TransactionScope {
    ): Job = async(context, start, block)
}

/**
 * A combination of an [EffectScope] and a [TransactionScope], available within the lambda arguments
 * passed to [BuildScope] `-Sync` APIs, such as [BuildScope.observeSync].
 *
 * This scope occurs within the context of a transaction, allowing you to
 * [sample][TransactionScope.sample] states, but general suspending calls are disallowed to prevent
 * blocking the transaction. You can [launch] new coroutines to perform long-running asynchronous
 * work. These coroutines are kept alive for the duration of the containing [BuildScope] that this
 * side-effect scope is running in.
 */
@ExperimentalKairosApi interface TransactionEffectScope : EffectScope, TransactionScope

/**
 * A [CoroutineScope] that also has access to a [KairosNetwork] that is bound to the former. All
 * usages of [KairosNetwork.activateSpec] with the [kairosNetwork] will be canceled when this
 * coroutine scope is canceled.
 */
@ExperimentalKairosApi interface KairosCoroutineScope : HasNetwork, CoroutineScope
+1 −1
Original line number Diff line number Diff line
@@ -137,7 +137,7 @@ fun <K, V, U> Incremental<K, V>.mapValues(
 * [loopback] is unset before it is [observed][BuildScope.observe] or
 * [sampled][TransactionScope.sample]. Note that it is safe to invoke
 * [TransactionScope.sampleDeferred] before [loopback] is set, provided the [DeferredValue] is not
 * [queried][KairosScope.get].
 * [queried][DeferredValue.value].
 */
@ExperimentalKairosApi
class IncrementalLoop<K, V>(private val name: String? = null) : Incremental<K, V>() {
Loading