Donate to e Foundation | Murena handsets with /e/OS | Own a part of Murena! Learn more

Commit a71cb37c authored by Steve Elliott's avatar Steve Elliott Committed by Android (Google) Code Review
Browse files

Merge "[FRP] Functional reactive programming library" into main

parents 64e46a09 595a90eb
Loading
Loading
Loading
Loading
+49 −0
Original line number Diff line number Diff line
//
// Copyright (C) 2024 The Android Open Source Project
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
//      http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//

package {
    default_team: "trendy_team_system_ui_please_use_a_more_specific_subteam_if_possible_",
    default_applicable_licenses: ["frameworks_base_packages_SystemUI_license"],
}

java_library {
    name: "kt-frp",
    host_supported: true,
    kotlincflags: ["-opt-in=com.android.systemui.experimental.frp.ExperimentalFrpApi"],
    srcs: ["src/**/*.kt"],
    static_libs: [
        "kotlin-stdlib",
        "kotlinx_coroutines",
    ],
}

java_test {
    name: "kt-frp-test",
    optimize: {
        enabled: false,
    },
    srcs: [
        "test/**/*.kt",
    ],
    static_libs: [
        "kt-frp",
        "junit",
        "kotlin-stdlib",
        "kotlin-test",
        "kotlinx_coroutines",
        "kotlinx_coroutines_test",
    ],
}
+3 −0
Original line number Diff line number Diff line
steell@google.com
nijamkin@google.com
evanlaird@google.com
+64 −0
Original line number Diff line number Diff line
# kt-frp

A functional reactive programming (FRP) library for Kotlin.

This library is **experimental** and should not be used for general production
code. The APIs within are subject to change, and there may be bugs.

## About FRP

Functional reactive programming is a type of reactive programming system that
follows a set of clear and composable rules, without sacrificing consistency.
FRP exposes an API that should be familiar to those versed in Kotlin `Flow`.

### Details for nerds

`kt-frp` implements an applicative / monadic flavor of FRP, using a push-pull
methodology to allow for efficient updates.

"Real" functional reactive programming should be specified with denotational
semantics ([wikipedia](https://en.wikipedia.org/wiki/Denotational_semantics)):
you can view the semantics for `kt-frp` [here](docs/semantics.md).

## Usage

First, stand up a new `FrpNetwork`. All reactive events and state is kept
consistent within a single network.

``` kotlin
val coroutineScope: CoroutineScope = ...
val frpNetwork = coroutineScope.newFrpNetwork()
```

You can use the `FrpNetwork` to stand-up a network of reactive events and state.
Events are modeled with `TFlow` (short for "transactional flow"), and state
`TState` (short for "transactional state").

``` kotlin
suspend fun activate(network: FrpNetwork) {
    network.activateSpec {
        val input = network.mutableTFlow<Unit>()
        // Launch a long-running side-effect that emits to the network
        // every second.
        launchEffect {
            while (true) {
                input.emit(Unit)
                delay(1.seconds)
            }
        }
        // Accumulate state
        val count: TState<Int> = input.fold { _, i -> i + 1 }
        // Observe events to perform side-effects in reaction to them
        input.observe {
            println("Got event ${count.sample()} at time: ${System.currentTimeMillis()}")
        }
    }
}
```

`FrpNetwork.activateSpec` will suspend indefinitely; cancelling the invocation
will tear-down all effects and obervers running within the lambda.

## Resources

- [Cheatsheet for those coming from Kotlin Flow](docs/flow-to-frp-cheatsheet.md)
+330 −0
Original line number Diff line number Diff line
# From Flows to FRP

## Key differences

* FRP evaluates all events (`TFlow` emissions + observers) in a transaction.

* FRP splits `Flow` APIs into two distinct types: `TFlow` and `TState`

    * `TFlow` is roughly equivalent to `SharedFlow` w/ a replay cache that
      exists for the duration of the current FRP transaction and shared with
      `SharingStarted.WhileSubscribed()`

    * `TState` is roughly equivalent to `StateFlow` shared with
      `SharingStarted.Eagerly`, but the current value can only be queried within
      a FRP transaction, and the value is only updated at the end of the
      transaction

* FRP further divides `Flow` APIs based on how they internally use state:

  * **FrpTransactionScope:** APIs that internally query some state need to be
    performed within an FRP transaction

    * this scope is available from the other scopes, and from most lambdas
      passed to other FRP APIs

  * **FrpStateScope:** APIs that internally accumulate state in reaction to
    events need to be performed within an FRP State scope (akin to a
    `CoroutineScope`)

    * this scope is a side-effect-free subset of FrpBuildScope, and so can be
      used wherever you have an FrpBuildScope

  * **FrpBuildScope:** APIs that perform external side-effects (`Flow.collect`)
    need to be performed within an FRP Build scope (akin to a `CoroutineScope`)

    * this scope is available from `FrpNetwork.activateSpec { … }`

  * All other APIs can be used anywhere

## emptyFlow()

Use `emptyTFlow`

``` kotlin
// this TFlow emits nothing
val noEvents: TFlow<Int> = emptyTFlow
```

## map { … }

Use `TFlow.map` / `TState.map`

``` kotlin
val anInt: TState<Int> = 
val squared: TState<Int> = anInt.map { it * it }
val messages: TFlow<String> = 
val messageLengths: TFlow<Int> = messages.map { it.size }
```

## filter { … } / mapNotNull { … }

### I have a TFlow

Use `TFlow.filter` / `TFlow.mapNotNull`

``` kotlin
val messages: TFlow<String> = 
val nonEmpty: TFlow<String> = messages.filter { it.isNotEmpty() }
```

### I have a TState

Convert the `TState` to `TFlow` using `TState.stateChanges`, then use
`TFlow.filter` / `TFlow.mapNotNull`

If you need to convert back to `TState`, use `TFlow.hold(initialValue)` on the
result.

``` kotlin
tState.stateChanges.filter {  }.hold(initialValue)
```

Note that `TFlow.hold` is only available within an `FrpStateScope` in order to
track the lifetime of the state accumulation.

## combine(...) { … }

### I have TStates

Use `combine(TStates)`

``` kotlin
val someInt: TState<Int> = 
val someString: TState<String> = 
val model: TState<MyModel> = combine(someInt, someString) { i, s -> MyModel(i, s) }
```

### I have TFlows

Convert the TFlows to TStates using `TFlow.hold(initialValue)`, then use
`combine(TStates)`

If you want the behavior of Flow.combine where nothing is emitted until each
TFlow has emitted at least once, you can use filter:

``` kotlin
// null used as an example, can use a different sentinel if needed
combine(tFlowA.hold(null), tFlowB.hold(null)) { a, b ->
        a?.let { b?.let {  } }
    }
    .filterNotNull()
```

Note that `TFlow.hold` is only available within an `FrpStateScope` in order to
track the lifetime of the state accumulation.

#### Explanation

`Flow.combine` always tracks the last-emitted value of each `Flow` it's
combining. This is a form of state-accumulation; internally, it collects from
each `Flow`, tracks the latest-emitted value, and when anything changes, it
re-runs the lambda to combine the latest values.

An effect of this is that `Flow.combine` doesn't emit until each combined `Flow`
has emitted at least once. This often bites developers. As a workaround,
developers generally append `.onStart { emit(initialValue) }` to the `Flows`
that don't immediately emit.

FRP avoids this gotcha by forcing usage of `TState` for `combine`, thus ensuring
that there is always a current value to be combined for each input.

## collect { … }

Use `observe { … }`

``` kotlin
val job: Job = tFlow.observe { println("observed: $it") }
```

Note that `observe` is only available within an `FrpBuildScope` in order to
track the lifetime of the observer. `FrpBuildScope` can only come from a
top-level `FrpNetwork.transaction { … }`, or a sub-scope created by using a
`-Latest` operator.

## sample(flow) { … }

### I want to sample a TState

Use `TState.sample()` to get the current value of a `TState`. This can be
invoked anywhere you have access to an `FrpTransactionScope`.

``` kotlin
// the lambda passed to map receives an FrpTransactionScope, so it can invoke
// sample
tFlow.map { tState.sample() }
```

#### Explanation

To keep all state-reads consistent, the current value of a TState can only be
queried within an FRP transaction, modeled with `FrpTransactionScope`. Note that
both `FrpStateScope` and `FrpBuildScope` extend `FrpTransactionScope`.

### I want to sample a TFlow

Convert to a `TState` by using `TFlow.hold(initialValue)`, then use `sample`.

Note that `hold` is only available within an `FrpStateScope` in order to track
the lifetime of the state accumulation.

## stateIn(scope, sharingStarted, initialValue)

Use `TFlow.hold(initialValue)`. There is no need to supply a sharingStarted
argument; all states are accumulated eagerly.

``` kotlin
val ints: TFlow<Int> = 
val lastSeenInt: TState<Int> = ints.hold(initialValue = 0)
```

Note that `hold` is only available within an `FrpStateScope` in order to track
the lifetime of the state accumulation (akin to the scope parameter of
`Flow.stateIn`). `FrpStateScope` can only come from a top-level
`FrpNetwork.transaction { … }`, or a sub-scope created by using a `-Latest`
operator. Also note that `FrpBuildScope` extends `FrpStateScope`.

## distinctUntilChanged()

Use `distinctUntilChanged` like normal. This is only available for `TFlow`;
`TStates` are already `distinctUntilChanged`.

## merge(...)

### I have TFlows

Use `merge(TFlows) { … }`. The lambda argument is used to disambiguate multiple
simultaneous emissions within the same transaction.

#### Explanation

Under FRP's rules, a `TFlow` may only emit up to once per transaction. This
means that if we are merging two or more `TFlows` that are emitting at the same
time (within the same transaction), the resulting merged `TFlow` must emit a
single value. The lambda argument allows the developer to decide what to do in
this case.

### I have TStates

If `combine` doesn't satisfy your needs, you can use `TState.stateChanges` to
convert to a `TFlow`, and then `merge`.

## conflatedCallbackFlow { … }

Use `tFlow { … }`.

As a shortcut, if you already have a `conflatedCallbackFlow { … }`, you can
convert it to a TFlow via `Flow.toTFlow()`.

Note that `tFlow` is only available within an `FrpBuildScope` in order to track
the lifetime of the input registration.

## first()

### I have a TState

Use `TState.sample`.

### I have a TFlow

Use `TFlow.nextOnly`, which works exactly like `Flow.first` but instead of
suspending it returns a `TFlow` that emits once.

The naming is intentionally different because `first` implies that it is the
first-ever value emitted from the `Flow` (which makes sense for cold `Flows`),
whereas `nextOnly` indicates that only the next value relative to the current
transaction (the one `nextOnly` is being invoked in) will be emitted.

Note that `nextOnly` is only available within an `FrpStateScope` in order to
track the lifetime of the state accumulation.

## flatMapLatest { … }

If you want to use -Latest to cancel old side-effects, similar to what the Flow
-Latest operators offer for coroutines, see `mapLatest`.

### I have a TState…

#### …and want to switch TStates

Use `TState.flatMap`

``` kotlin
val flattened = tState.flatMap { a -> getTState(a) }
```

#### …and want to switch TFlows

Use `TState<TFlow<T>>.switch()`

``` kotlin
val tFlow = tState.map { a -> getTFlow(a) }.switch()
```

### I have a TFlow…

#### …and want to switch TFlows

Use `hold` to convert to a `TState<TFlow<T>>`, then use `switch` to switch to
the latest `TFlow`.

``` kotlin
val tFlow = tFlowOfFlows.hold(emptyTFlow).switch()
```

#### …and want to switch TStates

Use `hold` to convert to a `TState<TState<T>>`, then use `flatMap` to switch to
the latest `TState`.

``` kotlin
val tState = tFlowOfStates.hold(tStateOf(initialValue)).flatMap { it }
```

## mapLatest { … } / collectLatest { … }

`FrpStateScope` and `FrpBuildScope` both provide `-Latest` operators that
automatically cancel old work when new values are emitted.

``` kotlin
val currentModel: TState<SomeModel> = 
val mapped: TState<...> = currentModel.mapLatestBuild { model ->
    effect { "new model in the house: $model" }
    model.someState.observe { "someState: $it" }
    val someData: TState<SomeInfo> =
        getBroadcasts(model.uri)
            .map { extractInfo(it) }
            .hold(initialInfo)
    
}
```

## flowOf(...)

### I want a TState

Use `tStateOf(initialValue)`.

### I want a TFlow

Use `now.map { initialValue }`

Note that `now` is only available within an `FrpTransactionScope`.

#### Explanation

`TFlows` are not cold, and so there isn't a notion of "emit this value once
there is a collector" like there is for `Flow`. The closest analog would be
`TState`, since the initial value is retained indefinitely until there is an
observer. However, it is often useful to immediately emit a value within the
current transaction, usually when using a `flatMap` or `switch`. In these cases,
using `now` explicitly models that the emission will occur within the current
transaction.

``` kotlin
fun <T> FrpTransactionScope.tFlowOf(value: T): TFlow<T> = now.map { value }
```

## MutableStateFlow / MutableSharedFlow

Use `MutableTState(frpNetwork, initialValue)` and `MutableTFlow(frpNetwork)`.
+225 −0
Original line number Diff line number Diff line
# FRP Semantics

`kt-frp`'s pure API is based off of the following denotational semantics
([wikipedia](https://en.wikipedia.org/wiki/Denotational_semantics)).

The semantics model `kt-frp` types as time-varying values; by making `Time` a
first-class value, we can define a referentially-transparent API that allows us
to reason about the behavior of the pure FRP combinators. This is
implementation-agnostic; we can compare the behavior of any implementation with
expected behavior denoted by these semantics to identify bugs.

The semantics are written in pseudo-Kotlin; places where we are deviating from
real Kotlin are noted with comments.

``` kotlin

sealed class Time : Comparable<Time> {
  object BigBang : Time()
  data class At(time: BigDecimal) : Time()
  object Infinity : Time()

  override final fun compareTo(other: Time): Int =
    when (this) {
      BigBang -> if (other === BigBang) 0 else -1
      is At -> when (other) {
        BigBang -> 1
        is At -> time.compareTo(other.time)
        Infinity -> -1
      }
      Infinity -> if (other === Infinity) 0 else 1
    }
}

typealias Transactional<T> = (Time) -> T

typealias TFlow<T> = SortedMap<Time, T>

private fun <T> SortedMap<Time, T>.pairwise(): List<Pair<Pair<Time, T>, Pair<Time<T>>>> =
  // NOTE: pretend evaluation is lazy, so that error() doesn't immediately throw
  (toList() + Pair(Time.Infinity, error("no value"))).zipWithNext()

class TState<T> internal constructor(
  internal val current: Transactional<T>,
  val stateChanges: TFlow<T>,
)

val emptyTFlow: TFlow<Nothing> = emptyMap()

fun <A, B> TFlow<A>.map(f: FrpTransactionScope.(A) -> B): TFlow<B> =
  mapValues { (t, a) -> FrpTransactionScope(t).f(a) }

fun <A> TFlow<A>.filter(f: FrpTransactionScope.(A) -> Boolean): TFlow<A> =
  filter { (t, a) -> FrpTransactionScope(t).f(a) }

fun <A> merge(
  first: TFlow<A>,
  second: TFlow<A>,
  onCoincidence: Time.(A, A) -> A,
): TFlow<A> =
  first.toMutableMap().also { result ->
    second.forEach { (t, a) ->
      result.merge(t, a) { f, s ->
        FrpTranscationScope(t).onCoincidence(f, a)
      }
    }
  }.toSortedMap()

fun <A> TState<TFlow<A>>.switch(): TFlow<A> {
  val truncated = listOf(Pair(Time.BigBang, current.invoke(Time.BigBang))) +
    stateChanges.dropWhile { (time, _) -> time < time0 }
  val events =
    truncated
      .pairwise()
      .flatMap { ((t0, sa), (t2, _)) ->
        sa.filter { (t1, a) -> t0 < t1 && t1 <= t2 }
      }
  return events.toSortedMap()
}

fun <A> TState<TFlow<A>>.switchPromptly(): TFlow<A> {
  val truncated = listOf(Pair(Time.BigBang, current.invoke(Time.BigBang))) +
    stateChanges.dropWhile { (time, _) -> time < time0 }
  val events =
    truncated
      .pairwise()
      .flatMap { ((t0, sa), (t2, _)) ->
        sa.filter { (t1, a) -> t0 <= t1 && t1 <= t2 }
      }
  return events.toSortedMap()
}

typealias GroupedTFlow<K, V> = TFlow<Map<K, V>>

fun <K, V> TFlow<Map<K, V>>.groupByKey(): GroupedTFlow<K, V> = this

fun <K, V> GroupedTFlow<K, V>.eventsForKey(key: K): TFlow<V> =
  map { m -> m[k] }.filter { it != null }.map { it!! }

fun <A, B> TState<A>.map(f: (A) -> B): TState<B> =
  TState(
    current = { t -> f(current.invoke(t)) },
    stateChanges = stateChanges.map { f(it) },
  )

fun <A, B, C> TState<A>.combineWith(
  other: TState<B>,
  f: (A, B) -> C,
): TState<C> =
  TState(
    current = { t -> f(current.invoke(t), other.current.invoke(t)) },
    stateChanges = run {
      val aChanges =
        stateChanges
          .map { a ->
            val b = other.current.sample()
            Triple(a, b, f(a, b))
          }
      val bChanges =
        other
          .stateChanges
          .map { b ->
            val a = current.sample()
            Triple(a, b, f(a, b))
          }
      merge(aChanges, bChanges) { (a, _, _), (_, b, _) ->
          Triple(a, b, f(a, b))
        }
        .map { (_, _, zipped) -> zipped }
    },
  )

fun <A> TState<TState<A>>.flatten(): TState<A> {
  val changes =
    stateChanges
      .pairwise()
      .flatMap { ((t0, oldInner), (t2, _)) ->
        val inWindow =
          oldInner
            .stateChanges
            .filter { (t1, b) -> t0 <= t1 && t1 < t2 }
        if (inWindow.firstOrNull()?.time != t0) {
          listOf(Pair(t0, oldInner.current.invoke(t0))) + inWindow
        } else {
          inWindow
        }
      }
  return TState(
    current = { t -> current.invoke(t).current.invoke(t) },
    stateChanges = changes.toSortedMap(),
  )
}

open class FrpTranscationScope internal constructor(
  internal val currentTime: Time,
) {
  val now: TFlow<Unit> =
    sortedMapOf(currentTime to Unit)

  fun <A> Transactional<A>.sample(): A =
    invoke(currentTime)

  fun <A> TState<A>.sample(): A =
    current.sample()
}

class FrpStateScope internal constructor(
  time: Time,
  internal val stopTime: Time,
): FrpTransactionScope(time) {

  fun <A, B> TFlow<A>.fold(
    initialValue: B,
    f: FrpTransactionScope.(B, A) -> B,
  ): TState<B> {
    val truncated =
      dropWhile { (t, _) -> t < currentTime }
        .takeWhile { (t, _) -> t <= stopTime }
    val folded =
      truncated
        .scan(Pair(currentTime, initialValue)) { (_, b) (t, a) ->
          Pair(t, FrpTransactionScope(t).f(a, b))
        }
    val lookup = { t1 ->
      folded.lastOrNull { (t0, _) -> t0 < t1 }?.value ?: initialValue
    }
    return TState(lookup, folded.toSortedMap())
  }

  fun <A> TFlow<A>.hold(initialValue: A): TState<A> =
    fold(initialValue) { _, a -> a }

  fun <K, V> TFlow<Map<K, Maybe<V>>>.foldMapIncrementally(
    initialValues: Map<K, V>
  ): TState<Map<K, V>> =
    fold(initialValues) { patch, map ->
      val eithers = patch.map { (k, v) ->
        if (v is Just) Left(k to v.value) else Right(k)
      }
      val adds = eithers.filterIsInstance<Left>().map { it.left }
      val removes = eithers.filterIsInstance<Right>().map { it.right }
      val removed: Map<K, V> = map - removes.toSet()
      val updated: Map<K, V> = removed + adds
      updated
    }

  fun <K : Any, V> TFlow<Map<K, Maybe<TFlow<V>>>>.mergeIncrementally(
    initialTFlows: Map<K, TFlow<V>>,
  ): TFlow<Map<K, V>> =
    foldMapIncrementally(initialTFlows).map { it.merge() }.switch()

  fun <K, A, B> TFlow<Map<K, Maybe<A>>.mapLatestStatefulForKey(
    transform: suspend FrpStateScope.(A) -> B,
  ): TFlow<Map<K, Maybe<B>>> =
    pairwise().map { ((t0, patch), (t1, _)) ->
      patch.map { (k, ma) ->
        ma.map { a ->
          FrpStateScope(t0, t1).transform(a)
        }
      }
    }
  }

}

```
Loading