Donate to e Foundation | Murena handsets with /e/OS | Own a part of Murena! Learn more

Commit a5910e79 authored by Steve Elliott's avatar Steve Elliott
Browse files

Flow utility functions: pairwise + setChanges

Bug: 241121499
Test: atest PairwiseFlowTest
Change-Id: I5c7c81c7889a388bcede2a9fa25ff48b2be5c92b
parent 3292a30a
Loading
Loading
Loading
Loading
+98 −0
Original line number Original line Diff line number Diff line
/*
 * Copyright (C) 2022 The Android Open Source Project
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *      http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package com.android.systemui.util.kotlin

import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.distinctUntilChanged
import kotlinx.coroutines.flow.drop
import kotlinx.coroutines.flow.onStart
import kotlinx.coroutines.flow.zip

/**
 * Returns a new [Flow] that combines the two most recent emissions from [this] using [transform].
 * Note that the new Flow will not start emitting until it has received two emissions from the
 * upstream Flow.
 *
 * Useful for code that needs to compare the current value to the previous value.
 */
fun <T, R> Flow<T>.pairwiseBy(transform: suspend (old: T, new: T) -> R): Flow<R> {
    // same as current flow, but with the very first event skipped
    val nextEvents = drop(1)
    // zip current flow and nextEvents; transform will receive a pair of old and new value. This
    // works because zip will suppress emissions until both flows have emitted something; since in
    // this case both flows are emitting at the same rate, but the current flow just has one extra
    // thing emitted at the start, the effect is that zip will cache the most recent value while
    // waiting for the next emission from nextEvents.
    return zip(nextEvents, transform)
}

/**
 * Returns a new [Flow] that combines the two most recent emissions from [this] using [transform].
 * [initialValue] will be used as the "old" value for the first emission.
 *
 * Useful for code that needs to compare the current value to the previous value.
 */
fun <T, R> Flow<T>.pairwiseBy(
    initialValue: T,
    transform: suspend (previousValue: T, newValue: T) -> R,
): Flow<R> =
    onStart { emit(initialValue) }.pairwiseBy(transform)

/**
 * Returns a new [Flow] that produces the two most recent emissions from [this]. Note that the new
 * Flow will not start emitting until it has received two emissions from the upstream Flow.
 *
 * Useful for code that needs to compare the current value to the previous value.
 */
fun <T> Flow<T>.pairwise(): Flow<WithPrev<T>> = pairwiseBy(::WithPrev)

/**
 * Returns a new [Flow] that produces the two most recent emissions from [this]. [initialValue]
 * will be used as the "old" value for the first emission.
 *
 * Useful for code that needs to compare the current value to the previous value.
 */
fun <T> Flow<T>.pairwise(initialValue: T): Flow<WithPrev<T>> = pairwiseBy(initialValue, ::WithPrev)

/** Holds a [newValue] emitted from a [Flow], along with the [previousValue] emitted value. */
data class WithPrev<T>(val previousValue: T, val newValue: T)

/**
 * Returns a new [Flow] that combines the [Set] changes between each emission from [this] using
 * [transform].
 */
fun <T, R> Flow<Set<T>>.setChangesBy(
    transform: suspend (removed: Set<T>, added: Set<T>) -> R,
): Flow<R> = onStart { emit(emptySet()) }.distinctUntilChanged()
    .pairwiseBy { old: Set<T>, new: Set<T> ->
        // If an element was present in the old set, but not the new one, then it was removed
        val removed = old - new
        // If an element is present in the new set, but on the old one, then it was added
        val added = new - old
        transform(removed, added)
    }

/** Returns a new [Flow] that produces the [Set] changes between each emission from [this]. */
fun <T> Flow<Set<T>>.setChanges(): Flow<SetChanges<T>> = setChangesBy(::SetChanges)

/** Contains the difference in elements between two [Set]s. */
data class SetChanges<T>(
    /** Elements that are present in the first [Set] but not in the second. */
    val removed: Set<T>,
    /** Elements that are present in the second [Set] but not in the first. */
    val added: Set<T>,
)
+141 −0
Original line number Original line Diff line number Diff line
/*
 * Copyright (C) 2022 The Android Open Source Project
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *      http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package com.android.systemui.util.kotlin

import android.testing.AndroidTestingRunner
import androidx.test.filters.SmallTest
import com.android.systemui.SysuiTestCase
import com.google.common.truth.Truth.assertThat
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.Job
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.MutableSharedFlow
import kotlinx.coroutines.flow.MutableStateFlow
import kotlinx.coroutines.flow.asFlow
import kotlinx.coroutines.flow.emptyFlow
import kotlinx.coroutines.flow.filterIsInstance
import kotlinx.coroutines.flow.flowOf
import kotlinx.coroutines.flow.merge
import kotlinx.coroutines.flow.takeWhile
import kotlinx.coroutines.flow.toList
import kotlinx.coroutines.launch
import kotlinx.coroutines.runBlocking
import org.junit.Test
import org.junit.runner.RunWith

@SmallTest
@RunWith(AndroidTestingRunner::class)
class PairwiseFlowTest : SysuiTestCase() {
    @Test
    fun simple() = runBlocking {
        assertThatFlow((1..3).asFlow().pairwise())
            .emitsExactly(
                WithPrev(1, 2),
                WithPrev(2, 3),
            )
    }

    @Test
    fun notEnough() = runBlocking {
        assertThatFlow(flowOf(1).pairwise()).emitsNothing()
    }

    @Test
    fun withInit() = runBlocking {
        assertThatFlow(flowOf(2).pairwise(initialValue = 1))
            .emitsExactly(WithPrev(1, 2))
    }

    @Test
    fun notEnoughWithInit() = runBlocking {
        assertThatFlow(emptyFlow<Int>().pairwise(initialValue = 1)).emitsNothing()
    }

    @Test
    fun withStateFlow() = runBlocking(Dispatchers.Main.immediate) {
        val state = MutableStateFlow(1)
        val stop = MutableSharedFlow<Unit>()

        val stoppable = merge(state, stop)
            .takeWhile { it is Int }
            .filterIsInstance<Int>()

        val job1 = launch {
            assertThatFlow(stoppable.pairwise()).emitsExactly(WithPrev(1, 2))
        }
        state.value = 2
        val job2 = launch { assertThatFlow(stoppable.pairwise()).emitsNothing() }

        stop.emit(Unit)

        assertThatJob(job1).isCompleted()
        assertThatJob(job2).isCompleted()
    }
}

@SmallTest
@RunWith(AndroidTestingRunner::class)
class SetChangesFlowTest : SysuiTestCase() {
    @Test
    fun simple() = runBlocking {
        assertThatFlow(
            flowOf(setOf(1, 2, 3), setOf(2, 3, 4)).setChanges()
        ).emitsExactly(
            SetChanges(
                added = setOf(1, 2, 3),
                removed = emptySet(),
            ),
            SetChanges(
                added = setOf(4),
                removed = setOf(1),
            ),
        )
    }

    @Test
    fun onlyOneEmission() = runBlocking {
        assertThatFlow(flowOf(setOf(1)).setChanges())
            .emitsExactly(
                SetChanges(
                    added = setOf(1),
                    removed = emptySet(),
                )
            )
    }

    @Test
    fun fromEmptySet() = runBlocking {
        assertThatFlow(flowOf(emptySet(), setOf(1, 2)).setChanges())
            .emitsExactly(
                SetChanges(
                    removed = emptySet(),
                    added = setOf(1, 2),
                )
            )
    }
}

private fun <T> assertThatFlow(flow: Flow<T>) = object {
    suspend fun emitsExactly(vararg emissions: T) =
        assertThat(flow.toList()).containsExactly(*emissions).inOrder()
    suspend fun emitsNothing() =
        assertThat(flow.toList()).isEmpty()
}

private fun assertThatJob(job: Job) = object {
    fun isCompleted() = assertThat(job.isCompleted).isTrue()
}