Loading packages/SystemUI/src/com/android/systemui/util/kotlin/Flow.kt +36 −0 Original line number Diff line number Diff line Loading @@ -16,11 +16,15 @@ package com.android.systemui.util.kotlin import java.util.concurrent.atomic.AtomicReference import kotlinx.coroutines.Dispatchers import kotlinx.coroutines.coroutineScope import kotlinx.coroutines.flow.Flow import kotlinx.coroutines.flow.collect import kotlinx.coroutines.flow.distinctUntilChanged import kotlinx.coroutines.flow.flow import kotlinx.coroutines.flow.onStart import kotlinx.coroutines.launch /** * Returns a new [Flow] that combines the two most recent emissions from [this] using [transform]. Loading Loading @@ -116,3 +120,35 @@ data class SetChanges<T>( /** Elements that are present in the second [Set] but not in the first. */ val added: Set<T>, ) /** * Returns a new [Flow] that emits at the same rate as [this], but combines the emitted value with * the most recent emission from [other] using [transform]. * * Note that the returned Flow will not emit anything until [other] has emitted at least one value. */ fun <A, B, C> Flow<A>.sample(other: Flow<B>, transform: suspend (A, B) -> C): Flow<C> = flow { coroutineScope { val noVal = Any() val sampledRef = AtomicReference(noVal) val job = launch(Dispatchers.Unconfined) { other.collect { sampledRef.set(it) } } collect { val sampled = sampledRef.get() if (sampled != noVal) { @Suppress("UNCHECKED_CAST") emit(transform(it, sampled as B)) } } job.cancel() } } /** * Returns a new [Flow] that emits at the same rate as [this], but emits the most recently emitted * value from [other] instead. * * Note that the returned Flow will not emit anything until [other] has emitted at least one value. */ fun <A> Flow<*>.sample(other: Flow<A>): Flow<A> = sample(other) { _, a -> a } packages/SystemUI/tests/src/com/android/systemui/util/kotlin/FlowUtilTests.kt +38 −0 Original line number Diff line number Diff line Loading @@ -28,12 +28,14 @@ import kotlinx.coroutines.flow.MutableStateFlow import kotlinx.coroutines.flow.asFlow import kotlinx.coroutines.flow.emptyFlow import kotlinx.coroutines.flow.filterIsInstance import kotlinx.coroutines.flow.flow import kotlinx.coroutines.flow.flowOf import kotlinx.coroutines.flow.merge import kotlinx.coroutines.flow.takeWhile import kotlinx.coroutines.flow.toList import kotlinx.coroutines.launch import kotlinx.coroutines.runBlocking import kotlinx.coroutines.yield import org.junit.Test import org.junit.runner.RunWith Loading Loading @@ -140,6 +142,42 @@ class SetChangesFlowTest : SysuiTestCase() { } } @SmallTest @RunWith(AndroidTestingRunner::class) class SampleFlowTest : SysuiTestCase() { @Test fun simple() = runBlocking { assertThatFlow(flow { yield(); emit(1) }.sample(flowOf(2)) { a, b -> a to b }) .emitsExactly(1 to 2) } @Test fun otherFlowNoValueYet() = runBlocking { assertThatFlow(flowOf(1).sample(emptyFlow<Unit>())) .emitsNothing() } @Test fun multipleSamples() = runBlocking { val samplee = MutableSharedFlow<Int>() val sampler = flow { emit(1) samplee.emit(1) emit(2) samplee.emit(2) samplee.emit(3) emit(3) emit(4) } assertThatFlow(sampler.sample(samplee) { a, b -> a to b }) .emitsExactly( 2 to 1, 3 to 3, 4 to 3, ) } } private fun <T> assertThatFlow(flow: Flow<T>) = object { suspend fun emitsExactly(vararg emissions: T) = assertThat(flow.toList()).containsExactly(*emissions).inOrder() Loading Loading
packages/SystemUI/src/com/android/systemui/util/kotlin/Flow.kt +36 −0 Original line number Diff line number Diff line Loading @@ -16,11 +16,15 @@ package com.android.systemui.util.kotlin import java.util.concurrent.atomic.AtomicReference import kotlinx.coroutines.Dispatchers import kotlinx.coroutines.coroutineScope import kotlinx.coroutines.flow.Flow import kotlinx.coroutines.flow.collect import kotlinx.coroutines.flow.distinctUntilChanged import kotlinx.coroutines.flow.flow import kotlinx.coroutines.flow.onStart import kotlinx.coroutines.launch /** * Returns a new [Flow] that combines the two most recent emissions from [this] using [transform]. Loading Loading @@ -116,3 +120,35 @@ data class SetChanges<T>( /** Elements that are present in the second [Set] but not in the first. */ val added: Set<T>, ) /** * Returns a new [Flow] that emits at the same rate as [this], but combines the emitted value with * the most recent emission from [other] using [transform]. * * Note that the returned Flow will not emit anything until [other] has emitted at least one value. */ fun <A, B, C> Flow<A>.sample(other: Flow<B>, transform: suspend (A, B) -> C): Flow<C> = flow { coroutineScope { val noVal = Any() val sampledRef = AtomicReference(noVal) val job = launch(Dispatchers.Unconfined) { other.collect { sampledRef.set(it) } } collect { val sampled = sampledRef.get() if (sampled != noVal) { @Suppress("UNCHECKED_CAST") emit(transform(it, sampled as B)) } } job.cancel() } } /** * Returns a new [Flow] that emits at the same rate as [this], but emits the most recently emitted * value from [other] instead. * * Note that the returned Flow will not emit anything until [other] has emitted at least one value. */ fun <A> Flow<*>.sample(other: Flow<A>): Flow<A> = sample(other) { _, a -> a }
packages/SystemUI/tests/src/com/android/systemui/util/kotlin/FlowUtilTests.kt +38 −0 Original line number Diff line number Diff line Loading @@ -28,12 +28,14 @@ import kotlinx.coroutines.flow.MutableStateFlow import kotlinx.coroutines.flow.asFlow import kotlinx.coroutines.flow.emptyFlow import kotlinx.coroutines.flow.filterIsInstance import kotlinx.coroutines.flow.flow import kotlinx.coroutines.flow.flowOf import kotlinx.coroutines.flow.merge import kotlinx.coroutines.flow.takeWhile import kotlinx.coroutines.flow.toList import kotlinx.coroutines.launch import kotlinx.coroutines.runBlocking import kotlinx.coroutines.yield import org.junit.Test import org.junit.runner.RunWith Loading Loading @@ -140,6 +142,42 @@ class SetChangesFlowTest : SysuiTestCase() { } } @SmallTest @RunWith(AndroidTestingRunner::class) class SampleFlowTest : SysuiTestCase() { @Test fun simple() = runBlocking { assertThatFlow(flow { yield(); emit(1) }.sample(flowOf(2)) { a, b -> a to b }) .emitsExactly(1 to 2) } @Test fun otherFlowNoValueYet() = runBlocking { assertThatFlow(flowOf(1).sample(emptyFlow<Unit>())) .emitsNothing() } @Test fun multipleSamples() = runBlocking { val samplee = MutableSharedFlow<Int>() val sampler = flow { emit(1) samplee.emit(1) emit(2) samplee.emit(2) samplee.emit(3) emit(3) emit(4) } assertThatFlow(sampler.sample(samplee) { a, b -> a to b }) .emitsExactly( 2 to 1, 3 to 3, 4 to 3, ) } } private fun <T> assertThatFlow(flow: Flow<T>) = object { suspend fun emitsExactly(vararg emissions: T) = assertThat(flow.toList()).containsExactly(*emissions).inOrder() Loading