Donate to e Foundation | Murena handsets with /e/OS | Own a part of Murena! Learn more

Commit 86d52433 authored by Steve Elliott's avatar Steve Elliott
Browse files

Add sample() Flow utility function

Bug: 241121499
Test: atest SampleFlowTest
Change-Id: I4e5d39bd68560462a496968dcaf55cee98b68ae1
parent 91c87a93
Loading
Loading
Loading
Loading
+36 −0
Original line number Diff line number Diff line
@@ -16,11 +16,15 @@

package com.android.systemui.util.kotlin

import java.util.concurrent.atomic.AtomicReference
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.coroutineScope
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.collect
import kotlinx.coroutines.flow.distinctUntilChanged
import kotlinx.coroutines.flow.flow
import kotlinx.coroutines.flow.onStart
import kotlinx.coroutines.launch

/**
 * Returns a new [Flow] that combines the two most recent emissions from [this] using [transform].
@@ -116,3 +120,35 @@ data class SetChanges<T>(
    /** Elements that are present in the second [Set] but not in the first. */
    val added: Set<T>,
)

/**
 * Returns a new [Flow] that emits at the same rate as [this], but combines the emitted value with
 * the most recent emission from [other] using [transform].
 *
 * Note that the returned Flow will not emit anything until [other] has emitted at least one value.
 */
fun <A, B, C> Flow<A>.sample(other: Flow<B>, transform: suspend (A, B) -> C): Flow<C> = flow {
    coroutineScope {
        val noVal = Any()
        val sampledRef = AtomicReference(noVal)
        val job = launch(Dispatchers.Unconfined) {
            other.collect { sampledRef.set(it) }
        }
        collect {
            val sampled = sampledRef.get()
            if (sampled != noVal) {
                @Suppress("UNCHECKED_CAST")
                emit(transform(it, sampled as B))
            }
        }
        job.cancel()
    }
}

/**
 * Returns a new [Flow] that emits at the same rate as [this], but emits the most recently emitted
 * value from [other] instead.
 *
 * Note that the returned Flow will not emit anything until [other] has emitted at least one value.
 */
fun <A> Flow<*>.sample(other: Flow<A>): Flow<A> = sample(other) { _, a -> a }
+38 −0
Original line number Diff line number Diff line
@@ -28,12 +28,14 @@ import kotlinx.coroutines.flow.MutableStateFlow
import kotlinx.coroutines.flow.asFlow
import kotlinx.coroutines.flow.emptyFlow
import kotlinx.coroutines.flow.filterIsInstance
import kotlinx.coroutines.flow.flow
import kotlinx.coroutines.flow.flowOf
import kotlinx.coroutines.flow.merge
import kotlinx.coroutines.flow.takeWhile
import kotlinx.coroutines.flow.toList
import kotlinx.coroutines.launch
import kotlinx.coroutines.runBlocking
import kotlinx.coroutines.yield
import org.junit.Test
import org.junit.runner.RunWith

@@ -140,6 +142,42 @@ class SetChangesFlowTest : SysuiTestCase() {
    }
}

@SmallTest
@RunWith(AndroidTestingRunner::class)
class SampleFlowTest : SysuiTestCase() {
    @Test
    fun simple() = runBlocking {
        assertThatFlow(flow { yield(); emit(1) }.sample(flowOf(2)) { a, b -> a to b })
            .emitsExactly(1 to 2)
    }

    @Test
    fun otherFlowNoValueYet() = runBlocking {
        assertThatFlow(flowOf(1).sample(emptyFlow<Unit>()))
            .emitsNothing()
    }

    @Test
    fun multipleSamples() = runBlocking {
        val samplee = MutableSharedFlow<Int>()
        val sampler = flow {
            emit(1)
            samplee.emit(1)
            emit(2)
            samplee.emit(2)
            samplee.emit(3)
            emit(3)
            emit(4)
        }
        assertThatFlow(sampler.sample(samplee) { a, b -> a to b })
            .emitsExactly(
                2 to 1,
                3 to 3,
                4 to 3,
            )
    }
}

private fun <T> assertThatFlow(flow: Flow<T>) = object {
    suspend fun emitsExactly(vararg emissions: T) =
        assertThat(flow.toList()).containsExactly(*emissions).inOrder()