Loading packages/SystemUI/utils/kairos/src/com/android/systemui/kairos/Booleans.kt 0 → 100644 +32 −0 Original line number Diff line number Diff line /* * Copyright (C) 2025 The Android Open Source Project * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ package com.android.systemui.kairos /** Returns a [State] that is `true` only when all of [states] are `true`. */ @ExperimentalKairosApi fun allOf(vararg states: State<Boolean>): State<Boolean> = combine(*states) { it.allTrue() } /** Returns a [State] that is `true` when any of [states] are `true`. */ @ExperimentalKairosApi fun anyOf(vararg states: State<Boolean>): State<Boolean> = combine(*states) { it.anyTrue() } /** Returns a [State] containing the inverse of the Boolean held by the original [State]. */ @ExperimentalKairosApi fun not(state: State<Boolean>): State<Boolean> = state.mapCheapUnsafe { !it } private fun Iterable<Boolean>.allTrue() = all { it } private fun Iterable<Boolean>.anyTrue() = any { it } packages/SystemUI/utils/kairos/src/com/android/systemui/kairos/BuildScope.kt +105 −103 Original line number Diff line number Diff line Loading @@ -17,17 +17,14 @@ package com.android.systemui.kairos import com.android.systemui.kairos.util.Maybe import com.android.systemui.kairos.util.just import com.android.systemui.kairos.util.map import kotlin.coroutines.CoroutineContext import kotlin.coroutines.EmptyCoroutineContext import kotlinx.coroutines.CompletableDeferred import kotlinx.coroutines.CoroutineScope import kotlinx.coroutines.Deferred import kotlinx.coroutines.DisposableHandle import kotlinx.coroutines.Job import kotlinx.coroutines.awaitCancellation import kotlinx.coroutines.coroutineScope import kotlinx.coroutines.flow.Flow import kotlinx.coroutines.flow.FlowCollector import kotlinx.coroutines.flow.MutableSharedFlow Loading @@ -36,9 +33,8 @@ import kotlinx.coroutines.flow.SharedFlow import kotlinx.coroutines.flow.StateFlow import kotlinx.coroutines.flow.dropWhile import kotlinx.coroutines.flow.scan import kotlinx.coroutines.launch /** A function that modifies the KairosNetwork. */ /** A computation that can modify the Kairos network. */ typealias BuildSpec<A> = BuildScope.() -> A /** Loading @@ -56,17 +52,7 @@ inline operator fun <A> BuildScope.invoke(block: BuildScope.() -> A) = run(block /** Operations that add inputs and outputs to a Kairos network. */ @ExperimentalKairosApi interface BuildScope : StateScope { /** * A [KairosNetwork] handle that is bound to this [BuildScope]. * * It supports all of the standard functionality by which external code can interact with this * Kairos network, but all [activated][KairosNetwork.activateSpec] [BuildSpec]s are bound as * children to this [BuildScope], such that when this [BuildScope] is destroyed, all children * are also destroyed. */ val kairosNetwork: KairosNetwork interface BuildScope : HasNetwork, StateScope { /** * Defers invoking [block] until after the current [BuildScope] code-path completes, returning a Loading Loading @@ -110,11 +96,21 @@ interface BuildScope : StateScope { * executed if this [BuildScope] is still active by that time. It can be deactivated due to a * -Latest combinator, for example. * * Shorthand for: * [Disposing][DisposableHandle.dispose] of the returned [DisposableHandle] will stop the * observation of new emissions. It will however *not* cancel any running effects from previous * emissions. To achieve this behavior, use [launchScope] or [asyncScope] to create a child * build scope: * ``` kotlin * events.observe { effect { ... } } * val job = launchScope { * events.observe { x -> * launchEffect { longRunningEffect(x) } * } * } * // cancels observer and any running effects: * job.cancel() * ``` */ // TODO: remove disposable handle return? might add more confusion than convenience fun <A> Events<A>.observe( coroutineContext: CoroutineContext = EmptyCoroutineContext, block: EffectScope.(A) -> Unit = {}, Loading @@ -129,7 +125,7 @@ interface BuildScope : StateScope { * same key are undone (any registered [observers][observe] are unregistered, and any pending * [side-effects][effect] are cancelled). * * If the [Maybe] contained within the value for an associated key is [none], then the * If the [Maybe] value for an associated key is [absent][Maybe.absent], then the * previously-active [BuildSpec] will be undone with no replacement. */ fun <K, A, B> Events<Map<K, Maybe<BuildSpec<A>>>>.applyLatestSpecForKey( Loading @@ -138,10 +134,10 @@ interface BuildScope : StateScope { ): Pair<Events<Map<K, Maybe<A>>>, DeferredValue<Map<K, B>>> /** * Creates an instance of an [Events] with elements that are from [builder]. * Creates an instance of an [Events] with elements that are emitted from [builder]. * * [builder] is run in its own coroutine, allowing for ongoing work that can emit to the * provided [MutableState]. * provided [EventProducerScope]. * * By default, [builder] is only running while the returned [Events] is being * [observed][observe]. If you want it to run at all times, simply add a no-op observer: Loading @@ -149,16 +145,16 @@ interface BuildScope : StateScope { * events { ... }.apply { observe() } * ``` */ fun <T> events( name: String? = null, builder: suspend EventProducerScope<T>.() -> Unit, ): Events<T> // TODO: eventually this should be defined on KairosNetwork + an extension on HasNetwork // - will require modifying InputNode so that it can be manually killed, as opposed to using // takeUntil (which requires a StateScope). fun <T> events(builder: suspend EventProducerScope<T>.() -> Unit): Events<T> /** * Creates an instance of an [Events] with elements that are emitted from [builder]. * * [builder] is run in its own coroutine, allowing for ongoing work that can emit to the * provided [MutableState]. * provided [CoalescingEventProducerScope]. * * By default, [builder] is only running while the returned [Events] is being * [observed][observe]. If you want it to run at all times, simply add a no-op observer: Loading @@ -171,6 +167,7 @@ interface BuildScope : StateScope { * [coalesce]. Once the batch is consumed by the kairos network in the next transaction, the * batch is reset back to [getInitialValue]. */ // TODO: see TODO for [events] fun <In, Out> coalescingEvents( getInitialValue: () -> Out, coalesce: (old: Out, new: In) -> Out, Loading @@ -186,6 +183,7 @@ interface BuildScope : StateScope { * * The return value from [block] can be accessed via the returned [DeferredValue]. */ // TODO: return a DisposableHandle instead of Job? fun <A> asyncScope(block: BuildSpec<A>): Pair<DeferredValue<A>, Job> // TODO: once we have context params, these can all become extensions: Loading @@ -198,9 +196,9 @@ interface BuildScope : StateScope { * outside of the current Kairos transaction; when [transform] returns, the returned value is * emitted from the result [Events] in a new transaction. * * Shorthand for: * ``` kotlin * events.mapLatestBuild { a -> asyncEvent { transform(a) } }.flatten() * fun <A, B> Events<A>.mapAsyncLatest(transform: suspend (A) -> B): Events<B> = * mapLatestBuild { a -> asyncEvent { transform(a) } }.flatten() * ``` */ fun <A, B> Events<A>.mapAsyncLatest(transform: suspend (A) -> B): Events<B> = Loading @@ -219,42 +217,19 @@ interface BuildScope : StateScope { /** * Returns a [StateFlow] whose [value][StateFlow.value] tracks the current * [value of this State][State.sample], and will emit at the same rate as [State.changes]. * * Note that the [value][StateFlow.value] is not available until the *end* of the current * transaction. If you need the current value before this time, then use [State.sample]. */ fun <A> State<A>.toStateFlow(): StateFlow<A> { val uninitialized = Any() var initialValue: Any? = uninitialized val innerStateFlow = MutableStateFlow<Any?>(uninitialized) deferredBuildScope { initialValue = sample() changes.observe { innerStateFlow.value = it initialValue = null } } @Suppress("UNCHECKED_CAST") fun getValue(innerValue: Any?): A = when { innerValue !== uninitialized -> innerValue as A initialValue !== uninitialized -> initialValue as A else -> error( "Attempted to access StateFlow.value before Kairos transaction has completed." ) } val innerStateFlow = MutableStateFlow(sampleDeferred()) changes.observe { innerStateFlow.value = deferredOf(it) } return object : StateFlow<A> { override val replayCache: List<A> get() = innerStateFlow.replayCache.map(::getValue) get() = innerStateFlow.replayCache.map { it.value } override val value: A get() = getValue(innerStateFlow.value) get() = innerStateFlow.value.value override suspend fun collect(collector: FlowCollector<A>): Nothing { innerStateFlow.collect { collector.emit(getValue(it)) } innerStateFlow.collect { collector.emit(it.value) } } } } Loading Loading @@ -365,14 +340,14 @@ interface BuildScope : StateScope { initialSpec: BuildSpec<A> ): Pair<Events<B>, DeferredValue<A>> { val (events, result) = mapCheap { spec -> mapOf(Unit to just(spec)) } mapCheap { spec -> mapOf(Unit to Maybe.present(spec)) } .applyLatestSpecForKey(initialSpecs = mapOf(Unit to initialSpec), numKeys = 1) val outEvents: Events<B> = events.mapMaybe { checkNotNull(it[Unit]) { "applyLatest: expected result, but none present in: $it" } } val outInit: DeferredValue<A> = deferredBuildScope { val initResult: Map<Unit, A> = result.get() val initResult: Map<Unit, A> = result.value check(Unit in initResult) { "applyLatest: expected initial result, but none present in: $initResult" } Loading Loading @@ -425,7 +400,7 @@ interface BuildScope : StateScope { transform: BuildScope.(A) -> B, ): Pair<Events<B>, DeferredValue<B>> = mapCheap { buildSpec { transform(it) } } .applyLatestSpec(initialSpec = buildSpec { transform(initialValue.get()) }) .applyLatestSpec(initialSpec = buildSpec { transform(initialValue.value) }) /** * Returns an [Events] containing the results of applying each [BuildSpec] emitted from the Loading @@ -436,7 +411,7 @@ interface BuildScope : StateScope { * same key are undone (any registered [observers][observe] are unregistered, and any pending * [side-effects][effect] are cancelled). * * If the [Maybe] contained within the value for an associated key is [none], then the * If the [Maybe] value for an associated key is [absent][Maybe.absent], then the * previously-active [BuildSpec] will be undone with no replacement. */ fun <K, A, B> Events<Map<K, Maybe<BuildSpec<A>>>>.applyLatestSpecForKey( Loading @@ -445,6 +420,17 @@ interface BuildScope : StateScope { ): Pair<Events<Map<K, Maybe<A>>>, DeferredValue<Map<K, B>>> = applyLatestSpecForKey(deferredOf(initialSpecs), numKeys) /** * Returns an [Incremental] containing the results of applying each [BuildSpec] emitted from the * original [Incremental]. * * When each [BuildSpec] is applied, changes from the previously-active [BuildSpec] with the * same key are undone (any registered [observers][observe] are unregistered, and any pending * [side-effects][effect] are cancelled). * * If the [Maybe] value for an associated key is [absent][Maybe.absent], then the * previously-active [BuildSpec] will be undone with no replacement. */ fun <K, V> Incremental<K, BuildSpec<V>>.applyLatestSpecForKey( numKeys: Int? = null ): Incremental<K, V> { Loading @@ -460,7 +446,7 @@ interface BuildScope : StateScope { * same key are undone (any registered [observers][observe] are unregistered, and any pending * [side-effects][effect] are cancelled). * * If the [Maybe] contained within the value for an associated key is [none], then the * If the [Maybe] value for an associated key is [absent][Maybe.absent], then the * previously-active [BuildSpec] will be undone with no replacement. */ fun <K, V> Events<Map<K, Maybe<BuildSpec<V>>>>.applyLatestSpecForKey( Loading @@ -476,7 +462,7 @@ interface BuildScope : StateScope { * same key are undone (any registered [observers][observe] are unregistered, and any pending * [side-effects][effect] are cancelled). * * If the [Maybe] contained within the value for an associated key is [none], then the * If the [Maybe] value for an associated key is [absent][Maybe.absent], then the * previously-active [BuildSpec] will be undone with no replacement. */ fun <K, V> Events<Map<K, Maybe<BuildSpec<V>>>>.holdLatestSpecForKey( Loading @@ -495,7 +481,7 @@ interface BuildScope : StateScope { * same key are undone (any registered [observers][observe] are unregistered, and any pending * [side-effects][effect] are cancelled). * * If the [Maybe] contained within the value for an associated key is [none], then the * If the [Maybe] value for an associated key is [absent][Maybe.absent], then the * previously-active [BuildSpec] will be undone with no replacement. */ fun <K, V> Events<Map<K, Maybe<BuildSpec<V>>>>.holdLatestSpecForKey( Loading @@ -513,7 +499,7 @@ interface BuildScope : StateScope { * registered [observers][observe] are unregistered, and any pending [side-effects][effect] are * cancelled). * * If the [Maybe] contained within the value for an associated key is [none], then the * If the [Maybe] value for an associated key is [absent][Maybe.absent], then the * previously-active [BuildScope] will be undone with no replacement. */ fun <K, A, B> Events<Map<K, Maybe<A>>>.mapLatestBuildForKey( Loading @@ -524,7 +510,7 @@ interface BuildScope : StateScope { map { patch -> patch.mapValues { (k, v) -> v.map { buildSpec { transform(k, it) } } } } .applyLatestSpecForKey( deferredBuildScope { initialValues.get().mapValues { (k, v) -> buildSpec { transform(k, v) } } initialValues.value.mapValues { (k, v) -> buildSpec { transform(k, v) } } }, numKeys = numKeys, ) Loading @@ -539,7 +525,7 @@ interface BuildScope : StateScope { * registered [observers][observe] are unregistered, and any pending [side-effects][effect] are * cancelled). * * If the [Maybe] contained within the value for an associated key is [none], then the * If the [Maybe] value for an associated key is [absent][Maybe.absent], then the * previously-active [BuildScope] will be undone with no replacement. */ fun <K, A, B> Events<Map<K, Maybe<A>>>.mapLatestBuildForKey( Loading @@ -558,7 +544,7 @@ interface BuildScope : StateScope { * registered [observers][observe] are unregistered, and any pending [side-effects][effect] are * cancelled). * * If the [Maybe] contained within the value for an associated key is [none], then the * If the [Maybe] value for an associated key is [absent][Maybe.absent], then the * previously-active [BuildScope] will be undone with no replacement. */ fun <K, A, B> Events<Map<K, Maybe<A>>>.mapLatestBuildForKey( Loading @@ -570,7 +556,7 @@ interface BuildScope : StateScope { fun <R> Events<R>.nextDeferred(): Deferred<R> { lateinit var next: CompletableDeferred<R> val job = launchScope { nextOnly().observe { next.complete(it) } } next = CompletableDeferred<R>(parent = job) next = CompletableDeferred(parent = job) return next } Loading @@ -581,8 +567,7 @@ interface BuildScope : StateScope { } /** Returns an [Events] that emits whenever this [Flow] emits. */ fun <A> Flow<A>.toEvents(name: String? = null): Events<A> = events(name) { collect { emit(it) } } fun <A> Flow<A>.toEvents(): Events<A> = events { collect { emit(it) } } /** * Shorthand for: Loading Loading @@ -679,6 +664,13 @@ interface BuildScope : StateScope { * Invokes [block] on the value held in this [State]. [block] receives an [BuildScope] that can * be used to make further modifications to the Kairos network, and/or perform side-effects via * [effect]. * * ``` kotlin * fun <A> State<A>.observeBuild(block: BuildScope.(A) -> Unit = {}): Job = launchScope { * block(sample()) * changes.observeBuild(block) * } * ``` */ fun <A> State<A>.observeBuild(block: BuildScope.(A) -> Unit = {}): Job = launchScope { block(sample()) Loading Loading @@ -706,12 +698,9 @@ interface BuildScope : StateScope { * outside of the current Kairos transaction; when it completes, the returned [Events] emits in a * new transaction. * * Shorthand for: * ``` * events { emitter: MutableEvents<A> -> * val a = block() * emitter.emit(a) * } * ``` kotlin * fun <A> BuildScope.asyncEvent(block: suspend () -> A): Events<A> = * events { emit(block()) }.apply { observe() } * ``` */ @ExperimentalKairosApi Loading @@ -730,9 +719,12 @@ fun <A> BuildScope.asyncEvent(block: suspend () -> A): Events<A> = * executed if this [BuildScope] is still active by that time. It can be deactivated due to a * -Latest combinator, for example. * * Shorthand for: * ``` kotlin * launchScope { now.observe { block() } } * fun BuildScope.effect( * context: CoroutineContext = EmptyCoroutineContext, * block: EffectScope.() -> Unit, * ): Job = * launchScope { now.observe(context) { block() } } * ``` */ @ExperimentalKairosApi Loading @@ -748,13 +740,14 @@ fun BuildScope.effect( * done because the current [BuildScope] might be deactivated within this transaction, perhaps due * to a -Latest combinator. If this happens, then the coroutine will never actually be started. * * Shorthand for: * ``` kotlin * fun BuildScope.launchEffect(block: suspend KairosScope.() -> Unit): Job = * effect { effectCoroutineScope.launch { block() } } * ``` */ @ExperimentalKairosApi fun BuildScope.launchEffect(block: suspend CoroutineScope.() -> Unit): Job = asyncEffect(block) fun BuildScope.launchEffect(block: suspend KairosCoroutineScope.() -> Unit): Job = asyncEffect(block) /** * Launches [block] in a new coroutine, returning the result as a [Deferred]. Loading @@ -765,16 +758,17 @@ fun BuildScope.launchEffect(block: suspend CoroutineScope.() -> Unit): Job = asy * * Shorthand for: * ``` kotlin * fun <R> BuildScope.asyncEffect(block: suspend KairosScope.() -> R): Deferred<R> = * CompletableDeferred<R>.apply { * effect { effectCoroutineScope.launch { complete(coroutineScope { block() }) } } * effect { effectCoroutineScope.launch { complete(block()) } } * } * .await() * ``` */ @ExperimentalKairosApi fun <R> BuildScope.asyncEffect(block: suspend CoroutineScope.() -> R): Deferred<R> { fun <R> BuildScope.asyncEffect(block: suspend KairosCoroutineScope.() -> R): Deferred<R> { val result = CompletableDeferred<R>() val job = effect { effectCoroutineScope.launch { result.complete(coroutineScope(block)) } } val job = effect { launch { result.complete(block()) } } val handle = job.invokeOnCompletion { result.cancel() } result.invokeOnCompletion { handle.dispose() Loading Loading @@ -837,7 +831,7 @@ fun <T> BuildScope.conflatedEvents( } /** Scope for emitting to a [BuildScope.coalescingEvents]. */ interface CoalescingEventProducerScope<in T> { fun interface CoalescingEventProducerScope<in T> { /** * Inserts [value] into the current batch, enqueueing it for emission from this [Events] if not * already pending. Loading @@ -850,7 +844,7 @@ interface CoalescingEventProducerScope<in T> { } /** Scope for emitting to a [BuildScope.events]. */ interface EventProducerScope<in T> { fun interface EventProducerScope<in T> { /** * Emits a [value] to this [Events], suspending the caller until the Kairos transaction * containing the emission has completed. Loading @@ -868,3 +862,11 @@ suspend fun awaitClose(block: () -> Unit): Nothing = } finally { block() } /** * Runs [spec] in this [BuildScope], and then re-runs it whenever [rebuildSignal] emits. Returns a * [State] that holds the result of the currently-active [BuildSpec]. */ @ExperimentalKairosApi fun <A> BuildScope.rebuildOn(rebuildSignal: Events<*>, spec: BuildSpec<A>): State<A> = rebuildSignal.map { spec }.holdLatestSpec(spec) Loading
packages/SystemUI/utils/kairos/src/com/android/systemui/kairos/Booleans.kt 0 → 100644 +32 −0 Original line number Diff line number Diff line /* * Copyright (C) 2025 The Android Open Source Project * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ package com.android.systemui.kairos /** Returns a [State] that is `true` only when all of [states] are `true`. */ @ExperimentalKairosApi fun allOf(vararg states: State<Boolean>): State<Boolean> = combine(*states) { it.allTrue() } /** Returns a [State] that is `true` when any of [states] are `true`. */ @ExperimentalKairosApi fun anyOf(vararg states: State<Boolean>): State<Boolean> = combine(*states) { it.anyTrue() } /** Returns a [State] containing the inverse of the Boolean held by the original [State]. */ @ExperimentalKairosApi fun not(state: State<Boolean>): State<Boolean> = state.mapCheapUnsafe { !it } private fun Iterable<Boolean>.allTrue() = all { it } private fun Iterable<Boolean>.anyTrue() = any { it }
packages/SystemUI/utils/kairos/src/com/android/systemui/kairos/BuildScope.kt +105 −103 Original line number Diff line number Diff line Loading @@ -17,17 +17,14 @@ package com.android.systemui.kairos import com.android.systemui.kairos.util.Maybe import com.android.systemui.kairos.util.just import com.android.systemui.kairos.util.map import kotlin.coroutines.CoroutineContext import kotlin.coroutines.EmptyCoroutineContext import kotlinx.coroutines.CompletableDeferred import kotlinx.coroutines.CoroutineScope import kotlinx.coroutines.Deferred import kotlinx.coroutines.DisposableHandle import kotlinx.coroutines.Job import kotlinx.coroutines.awaitCancellation import kotlinx.coroutines.coroutineScope import kotlinx.coroutines.flow.Flow import kotlinx.coroutines.flow.FlowCollector import kotlinx.coroutines.flow.MutableSharedFlow Loading @@ -36,9 +33,8 @@ import kotlinx.coroutines.flow.SharedFlow import kotlinx.coroutines.flow.StateFlow import kotlinx.coroutines.flow.dropWhile import kotlinx.coroutines.flow.scan import kotlinx.coroutines.launch /** A function that modifies the KairosNetwork. */ /** A computation that can modify the Kairos network. */ typealias BuildSpec<A> = BuildScope.() -> A /** Loading @@ -56,17 +52,7 @@ inline operator fun <A> BuildScope.invoke(block: BuildScope.() -> A) = run(block /** Operations that add inputs and outputs to a Kairos network. */ @ExperimentalKairosApi interface BuildScope : StateScope { /** * A [KairosNetwork] handle that is bound to this [BuildScope]. * * It supports all of the standard functionality by which external code can interact with this * Kairos network, but all [activated][KairosNetwork.activateSpec] [BuildSpec]s are bound as * children to this [BuildScope], such that when this [BuildScope] is destroyed, all children * are also destroyed. */ val kairosNetwork: KairosNetwork interface BuildScope : HasNetwork, StateScope { /** * Defers invoking [block] until after the current [BuildScope] code-path completes, returning a Loading Loading @@ -110,11 +96,21 @@ interface BuildScope : StateScope { * executed if this [BuildScope] is still active by that time. It can be deactivated due to a * -Latest combinator, for example. * * Shorthand for: * [Disposing][DisposableHandle.dispose] of the returned [DisposableHandle] will stop the * observation of new emissions. It will however *not* cancel any running effects from previous * emissions. To achieve this behavior, use [launchScope] or [asyncScope] to create a child * build scope: * ``` kotlin * events.observe { effect { ... } } * val job = launchScope { * events.observe { x -> * launchEffect { longRunningEffect(x) } * } * } * // cancels observer and any running effects: * job.cancel() * ``` */ // TODO: remove disposable handle return? might add more confusion than convenience fun <A> Events<A>.observe( coroutineContext: CoroutineContext = EmptyCoroutineContext, block: EffectScope.(A) -> Unit = {}, Loading @@ -129,7 +125,7 @@ interface BuildScope : StateScope { * same key are undone (any registered [observers][observe] are unregistered, and any pending * [side-effects][effect] are cancelled). * * If the [Maybe] contained within the value for an associated key is [none], then the * If the [Maybe] value for an associated key is [absent][Maybe.absent], then the * previously-active [BuildSpec] will be undone with no replacement. */ fun <K, A, B> Events<Map<K, Maybe<BuildSpec<A>>>>.applyLatestSpecForKey( Loading @@ -138,10 +134,10 @@ interface BuildScope : StateScope { ): Pair<Events<Map<K, Maybe<A>>>, DeferredValue<Map<K, B>>> /** * Creates an instance of an [Events] with elements that are from [builder]. * Creates an instance of an [Events] with elements that are emitted from [builder]. * * [builder] is run in its own coroutine, allowing for ongoing work that can emit to the * provided [MutableState]. * provided [EventProducerScope]. * * By default, [builder] is only running while the returned [Events] is being * [observed][observe]. If you want it to run at all times, simply add a no-op observer: Loading @@ -149,16 +145,16 @@ interface BuildScope : StateScope { * events { ... }.apply { observe() } * ``` */ fun <T> events( name: String? = null, builder: suspend EventProducerScope<T>.() -> Unit, ): Events<T> // TODO: eventually this should be defined on KairosNetwork + an extension on HasNetwork // - will require modifying InputNode so that it can be manually killed, as opposed to using // takeUntil (which requires a StateScope). fun <T> events(builder: suspend EventProducerScope<T>.() -> Unit): Events<T> /** * Creates an instance of an [Events] with elements that are emitted from [builder]. * * [builder] is run in its own coroutine, allowing for ongoing work that can emit to the * provided [MutableState]. * provided [CoalescingEventProducerScope]. * * By default, [builder] is only running while the returned [Events] is being * [observed][observe]. If you want it to run at all times, simply add a no-op observer: Loading @@ -171,6 +167,7 @@ interface BuildScope : StateScope { * [coalesce]. Once the batch is consumed by the kairos network in the next transaction, the * batch is reset back to [getInitialValue]. */ // TODO: see TODO for [events] fun <In, Out> coalescingEvents( getInitialValue: () -> Out, coalesce: (old: Out, new: In) -> Out, Loading @@ -186,6 +183,7 @@ interface BuildScope : StateScope { * * The return value from [block] can be accessed via the returned [DeferredValue]. */ // TODO: return a DisposableHandle instead of Job? fun <A> asyncScope(block: BuildSpec<A>): Pair<DeferredValue<A>, Job> // TODO: once we have context params, these can all become extensions: Loading @@ -198,9 +196,9 @@ interface BuildScope : StateScope { * outside of the current Kairos transaction; when [transform] returns, the returned value is * emitted from the result [Events] in a new transaction. * * Shorthand for: * ``` kotlin * events.mapLatestBuild { a -> asyncEvent { transform(a) } }.flatten() * fun <A, B> Events<A>.mapAsyncLatest(transform: suspend (A) -> B): Events<B> = * mapLatestBuild { a -> asyncEvent { transform(a) } }.flatten() * ``` */ fun <A, B> Events<A>.mapAsyncLatest(transform: suspend (A) -> B): Events<B> = Loading @@ -219,42 +217,19 @@ interface BuildScope : StateScope { /** * Returns a [StateFlow] whose [value][StateFlow.value] tracks the current * [value of this State][State.sample], and will emit at the same rate as [State.changes]. * * Note that the [value][StateFlow.value] is not available until the *end* of the current * transaction. If you need the current value before this time, then use [State.sample]. */ fun <A> State<A>.toStateFlow(): StateFlow<A> { val uninitialized = Any() var initialValue: Any? = uninitialized val innerStateFlow = MutableStateFlow<Any?>(uninitialized) deferredBuildScope { initialValue = sample() changes.observe { innerStateFlow.value = it initialValue = null } } @Suppress("UNCHECKED_CAST") fun getValue(innerValue: Any?): A = when { innerValue !== uninitialized -> innerValue as A initialValue !== uninitialized -> initialValue as A else -> error( "Attempted to access StateFlow.value before Kairos transaction has completed." ) } val innerStateFlow = MutableStateFlow(sampleDeferred()) changes.observe { innerStateFlow.value = deferredOf(it) } return object : StateFlow<A> { override val replayCache: List<A> get() = innerStateFlow.replayCache.map(::getValue) get() = innerStateFlow.replayCache.map { it.value } override val value: A get() = getValue(innerStateFlow.value) get() = innerStateFlow.value.value override suspend fun collect(collector: FlowCollector<A>): Nothing { innerStateFlow.collect { collector.emit(getValue(it)) } innerStateFlow.collect { collector.emit(it.value) } } } } Loading Loading @@ -365,14 +340,14 @@ interface BuildScope : StateScope { initialSpec: BuildSpec<A> ): Pair<Events<B>, DeferredValue<A>> { val (events, result) = mapCheap { spec -> mapOf(Unit to just(spec)) } mapCheap { spec -> mapOf(Unit to Maybe.present(spec)) } .applyLatestSpecForKey(initialSpecs = mapOf(Unit to initialSpec), numKeys = 1) val outEvents: Events<B> = events.mapMaybe { checkNotNull(it[Unit]) { "applyLatest: expected result, but none present in: $it" } } val outInit: DeferredValue<A> = deferredBuildScope { val initResult: Map<Unit, A> = result.get() val initResult: Map<Unit, A> = result.value check(Unit in initResult) { "applyLatest: expected initial result, but none present in: $initResult" } Loading Loading @@ -425,7 +400,7 @@ interface BuildScope : StateScope { transform: BuildScope.(A) -> B, ): Pair<Events<B>, DeferredValue<B>> = mapCheap { buildSpec { transform(it) } } .applyLatestSpec(initialSpec = buildSpec { transform(initialValue.get()) }) .applyLatestSpec(initialSpec = buildSpec { transform(initialValue.value) }) /** * Returns an [Events] containing the results of applying each [BuildSpec] emitted from the Loading @@ -436,7 +411,7 @@ interface BuildScope : StateScope { * same key are undone (any registered [observers][observe] are unregistered, and any pending * [side-effects][effect] are cancelled). * * If the [Maybe] contained within the value for an associated key is [none], then the * If the [Maybe] value for an associated key is [absent][Maybe.absent], then the * previously-active [BuildSpec] will be undone with no replacement. */ fun <K, A, B> Events<Map<K, Maybe<BuildSpec<A>>>>.applyLatestSpecForKey( Loading @@ -445,6 +420,17 @@ interface BuildScope : StateScope { ): Pair<Events<Map<K, Maybe<A>>>, DeferredValue<Map<K, B>>> = applyLatestSpecForKey(deferredOf(initialSpecs), numKeys) /** * Returns an [Incremental] containing the results of applying each [BuildSpec] emitted from the * original [Incremental]. * * When each [BuildSpec] is applied, changes from the previously-active [BuildSpec] with the * same key are undone (any registered [observers][observe] are unregistered, and any pending * [side-effects][effect] are cancelled). * * If the [Maybe] value for an associated key is [absent][Maybe.absent], then the * previously-active [BuildSpec] will be undone with no replacement. */ fun <K, V> Incremental<K, BuildSpec<V>>.applyLatestSpecForKey( numKeys: Int? = null ): Incremental<K, V> { Loading @@ -460,7 +446,7 @@ interface BuildScope : StateScope { * same key are undone (any registered [observers][observe] are unregistered, and any pending * [side-effects][effect] are cancelled). * * If the [Maybe] contained within the value for an associated key is [none], then the * If the [Maybe] value for an associated key is [absent][Maybe.absent], then the * previously-active [BuildSpec] will be undone with no replacement. */ fun <K, V> Events<Map<K, Maybe<BuildSpec<V>>>>.applyLatestSpecForKey( Loading @@ -476,7 +462,7 @@ interface BuildScope : StateScope { * same key are undone (any registered [observers][observe] are unregistered, and any pending * [side-effects][effect] are cancelled). * * If the [Maybe] contained within the value for an associated key is [none], then the * If the [Maybe] value for an associated key is [absent][Maybe.absent], then the * previously-active [BuildSpec] will be undone with no replacement. */ fun <K, V> Events<Map<K, Maybe<BuildSpec<V>>>>.holdLatestSpecForKey( Loading @@ -495,7 +481,7 @@ interface BuildScope : StateScope { * same key are undone (any registered [observers][observe] are unregistered, and any pending * [side-effects][effect] are cancelled). * * If the [Maybe] contained within the value for an associated key is [none], then the * If the [Maybe] value for an associated key is [absent][Maybe.absent], then the * previously-active [BuildSpec] will be undone with no replacement. */ fun <K, V> Events<Map<K, Maybe<BuildSpec<V>>>>.holdLatestSpecForKey( Loading @@ -513,7 +499,7 @@ interface BuildScope : StateScope { * registered [observers][observe] are unregistered, and any pending [side-effects][effect] are * cancelled). * * If the [Maybe] contained within the value for an associated key is [none], then the * If the [Maybe] value for an associated key is [absent][Maybe.absent], then the * previously-active [BuildScope] will be undone with no replacement. */ fun <K, A, B> Events<Map<K, Maybe<A>>>.mapLatestBuildForKey( Loading @@ -524,7 +510,7 @@ interface BuildScope : StateScope { map { patch -> patch.mapValues { (k, v) -> v.map { buildSpec { transform(k, it) } } } } .applyLatestSpecForKey( deferredBuildScope { initialValues.get().mapValues { (k, v) -> buildSpec { transform(k, v) } } initialValues.value.mapValues { (k, v) -> buildSpec { transform(k, v) } } }, numKeys = numKeys, ) Loading @@ -539,7 +525,7 @@ interface BuildScope : StateScope { * registered [observers][observe] are unregistered, and any pending [side-effects][effect] are * cancelled). * * If the [Maybe] contained within the value for an associated key is [none], then the * If the [Maybe] value for an associated key is [absent][Maybe.absent], then the * previously-active [BuildScope] will be undone with no replacement. */ fun <K, A, B> Events<Map<K, Maybe<A>>>.mapLatestBuildForKey( Loading @@ -558,7 +544,7 @@ interface BuildScope : StateScope { * registered [observers][observe] are unregistered, and any pending [side-effects][effect] are * cancelled). * * If the [Maybe] contained within the value for an associated key is [none], then the * If the [Maybe] value for an associated key is [absent][Maybe.absent], then the * previously-active [BuildScope] will be undone with no replacement. */ fun <K, A, B> Events<Map<K, Maybe<A>>>.mapLatestBuildForKey( Loading @@ -570,7 +556,7 @@ interface BuildScope : StateScope { fun <R> Events<R>.nextDeferred(): Deferred<R> { lateinit var next: CompletableDeferred<R> val job = launchScope { nextOnly().observe { next.complete(it) } } next = CompletableDeferred<R>(parent = job) next = CompletableDeferred(parent = job) return next } Loading @@ -581,8 +567,7 @@ interface BuildScope : StateScope { } /** Returns an [Events] that emits whenever this [Flow] emits. */ fun <A> Flow<A>.toEvents(name: String? = null): Events<A> = events(name) { collect { emit(it) } } fun <A> Flow<A>.toEvents(): Events<A> = events { collect { emit(it) } } /** * Shorthand for: Loading Loading @@ -679,6 +664,13 @@ interface BuildScope : StateScope { * Invokes [block] on the value held in this [State]. [block] receives an [BuildScope] that can * be used to make further modifications to the Kairos network, and/or perform side-effects via * [effect]. * * ``` kotlin * fun <A> State<A>.observeBuild(block: BuildScope.(A) -> Unit = {}): Job = launchScope { * block(sample()) * changes.observeBuild(block) * } * ``` */ fun <A> State<A>.observeBuild(block: BuildScope.(A) -> Unit = {}): Job = launchScope { block(sample()) Loading Loading @@ -706,12 +698,9 @@ interface BuildScope : StateScope { * outside of the current Kairos transaction; when it completes, the returned [Events] emits in a * new transaction. * * Shorthand for: * ``` * events { emitter: MutableEvents<A> -> * val a = block() * emitter.emit(a) * } * ``` kotlin * fun <A> BuildScope.asyncEvent(block: suspend () -> A): Events<A> = * events { emit(block()) }.apply { observe() } * ``` */ @ExperimentalKairosApi Loading @@ -730,9 +719,12 @@ fun <A> BuildScope.asyncEvent(block: suspend () -> A): Events<A> = * executed if this [BuildScope] is still active by that time. It can be deactivated due to a * -Latest combinator, for example. * * Shorthand for: * ``` kotlin * launchScope { now.observe { block() } } * fun BuildScope.effect( * context: CoroutineContext = EmptyCoroutineContext, * block: EffectScope.() -> Unit, * ): Job = * launchScope { now.observe(context) { block() } } * ``` */ @ExperimentalKairosApi Loading @@ -748,13 +740,14 @@ fun BuildScope.effect( * done because the current [BuildScope] might be deactivated within this transaction, perhaps due * to a -Latest combinator. If this happens, then the coroutine will never actually be started. * * Shorthand for: * ``` kotlin * fun BuildScope.launchEffect(block: suspend KairosScope.() -> Unit): Job = * effect { effectCoroutineScope.launch { block() } } * ``` */ @ExperimentalKairosApi fun BuildScope.launchEffect(block: suspend CoroutineScope.() -> Unit): Job = asyncEffect(block) fun BuildScope.launchEffect(block: suspend KairosCoroutineScope.() -> Unit): Job = asyncEffect(block) /** * Launches [block] in a new coroutine, returning the result as a [Deferred]. Loading @@ -765,16 +758,17 @@ fun BuildScope.launchEffect(block: suspend CoroutineScope.() -> Unit): Job = asy * * Shorthand for: * ``` kotlin * fun <R> BuildScope.asyncEffect(block: suspend KairosScope.() -> R): Deferred<R> = * CompletableDeferred<R>.apply { * effect { effectCoroutineScope.launch { complete(coroutineScope { block() }) } } * effect { effectCoroutineScope.launch { complete(block()) } } * } * .await() * ``` */ @ExperimentalKairosApi fun <R> BuildScope.asyncEffect(block: suspend CoroutineScope.() -> R): Deferred<R> { fun <R> BuildScope.asyncEffect(block: suspend KairosCoroutineScope.() -> R): Deferred<R> { val result = CompletableDeferred<R>() val job = effect { effectCoroutineScope.launch { result.complete(coroutineScope(block)) } } val job = effect { launch { result.complete(block()) } } val handle = job.invokeOnCompletion { result.cancel() } result.invokeOnCompletion { handle.dispose() Loading Loading @@ -837,7 +831,7 @@ fun <T> BuildScope.conflatedEvents( } /** Scope for emitting to a [BuildScope.coalescingEvents]. */ interface CoalescingEventProducerScope<in T> { fun interface CoalescingEventProducerScope<in T> { /** * Inserts [value] into the current batch, enqueueing it for emission from this [Events] if not * already pending. Loading @@ -850,7 +844,7 @@ interface CoalescingEventProducerScope<in T> { } /** Scope for emitting to a [BuildScope.events]. */ interface EventProducerScope<in T> { fun interface EventProducerScope<in T> { /** * Emits a [value] to this [Events], suspending the caller until the Kairos transaction * containing the emission has completed. Loading @@ -868,3 +862,11 @@ suspend fun awaitClose(block: () -> Unit): Nothing = } finally { block() } /** * Runs [spec] in this [BuildScope], and then re-runs it whenever [rebuildSignal] emits. Returns a * [State] that holds the result of the currently-active [BuildSpec]. */ @ExperimentalKairosApi fun <A> BuildScope.rebuildOn(rebuildSignal: Events<*>, spec: BuildSpec<A>): State<A> = rebuildSignal.map { spec }.holdLatestSpec(spec)