Donate to e Foundation | Murena handsets with /e/OS | Own a part of Murena! Learn more

Commit 6f4029a5 authored by Govinda Wasserman's avatar Govinda Wasserman
Browse files

Creates throttle function for flows

The throttle function works similarly to the debounce function except
that instead of filtering out all values for the period, it instead
slows the rate of emission to at most 1 value per period.

Test: atest ThrottleFlowTest
BUG:265451297
FIX:265451297
Change-Id: I1a7d337ee87e8dc92933d61ebdf32322c7c5befd
parent 9e732e64
Loading
Loading
Loading
Loading
+93 −0
Original line number Diff line number Diff line
@@ -16,14 +16,21 @@

package com.android.systemui.util.kotlin

import com.android.systemui.util.time.SystemClock
import com.android.systemui.util.time.SystemClockImpl
import kotlinx.coroutines.CoroutineStart
import java.util.concurrent.atomic.AtomicReference
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.Job
import kotlinx.coroutines.coroutineScope
import kotlinx.coroutines.delay
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.channelFlow
import kotlinx.coroutines.flow.distinctUntilChanged
import kotlinx.coroutines.flow.flow
import kotlinx.coroutines.flow.onStart
import kotlinx.coroutines.launch
import kotlin.math.max

/**
 * Returns a new [Flow] that combines the two most recent emissions from [this] using [transform].
@@ -167,3 +174,89 @@ fun <A, B, C> Flow<A>.sample(other: Flow<B>, transform: suspend (A, B) -> C): Fl
 * Note that the returned Flow will not emit anything until [other] has emitted at least one value.
 */
fun <A> Flow<*>.sample(other: Flow<A>): Flow<A> = sample(other) { _, a -> a }

/**
 * Returns a flow that mirrors the original flow, but delays values following emitted values for the
 * given [periodMs]. If the original flow emits more than one value during this period, only the
 * latest value is emitted.
 *
 * Example:
 *
 * ```kotlin
 * flow {
 *     emit(1)     // t=0ms
 *     delay(90)
 *     emit(2)     // t=90ms
 *     delay(90)
 *     emit(3)     // t=180ms
 *     delay(1010)
 *     emit(4)     // t=1190ms
 *     delay(1010)
 *     emit(5)     // t=2200ms
 * }.throttle(1000)
 * ```
 *
 * produces the following emissions at the following times
 *
 * ```text
 * 1 (t=0ms), 3 (t=1000ms), 4 (t=2000ms), 5 (t=3000ms)
 * ```
 */
fun <T> Flow<T>.throttle(periodMs: Long): Flow<T> = this.throttle(periodMs, SystemClockImpl())

/**
 * Returns a flow that mirrors the original flow, but delays values following emitted values for the
 * given [periodMs] as reported by the given [clock]. If the original flow emits more than one value
 * during this period, only The latest value is emitted.
 *
 * Example:
 *
 * ```kotlin
 * flow {
 *     emit(1)     // t=0ms
 *     delay(90)
 *     emit(2)     // t=90ms
 *     delay(90)
 *     emit(3)     // t=180ms
 *     delay(1010)
 *     emit(4)     // t=1190ms
 *     delay(1010)
 *     emit(5)     // t=2200ms
 * }.throttle(1000)
 * ```
 *
 * produces the following emissions at the following times
 *
 * ```text
 * 1 (t=0ms), 3 (t=1000ms), 4 (t=2000ms), 5 (t=3000ms)
 * ```
 */
fun <T> Flow<T>.throttle(periodMs: Long, clock: SystemClock): Flow<T> = channelFlow {
    coroutineScope {
        var previousEmitTimeMs = 0L
        var delayJob: Job? = null
        var sendJob: Job? = null
        val outerScope = this

        collect {
            delayJob?.cancel()
            sendJob?.join()
            val currentTimeMs = clock.elapsedRealtime()
            val timeSinceLastEmit = currentTimeMs - previousEmitTimeMs
            val timeUntilNextEmit = max(0L, periodMs - timeSinceLastEmit)
            if (timeUntilNextEmit > 0L) {
                // We create delayJob to allow cancellation during the delay period
                delayJob = launch {
                    delay(timeUntilNextEmit)
                    sendJob = outerScope.launch(start = CoroutineStart.UNDISPATCHED) {
                        send(it)
                        previousEmitTimeMs = clock.elapsedRealtime()
                    }
                }
            } else {
                send(it)
                previousEmitTimeMs = currentTimeMs
            }
        }
    }
}
 No newline at end of file
+171 −0
Original line number Diff line number Diff line
@@ -19,9 +19,12 @@ package com.android.systemui.util.kotlin
import android.testing.AndroidTestingRunner
import androidx.test.filters.SmallTest
import com.android.systemui.SysuiTestCase
import com.android.systemui.util.time.FakeSystemClock
import com.google.common.truth.Truth.assertThat
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.ExperimentalCoroutinesApi
import kotlinx.coroutines.Job
import kotlinx.coroutines.delay
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.MutableSharedFlow
import kotlinx.coroutines.flow.MutableStateFlow
@@ -35,6 +38,10 @@ import kotlinx.coroutines.flow.takeWhile
import kotlinx.coroutines.flow.toList
import kotlinx.coroutines.launch
import kotlinx.coroutines.runBlocking
import kotlinx.coroutines.test.TestScope
import kotlinx.coroutines.test.advanceTimeBy
import kotlinx.coroutines.test.runCurrent
import kotlinx.coroutines.test.runTest
import kotlinx.coroutines.yield
import org.junit.Test
import org.junit.runner.RunWith
@@ -231,6 +238,170 @@ class SampleFlowTest : SysuiTestCase() {
    }
}

@OptIn(ExperimentalCoroutinesApi::class)
@SmallTest
@RunWith(AndroidTestingRunner::class)
class ThrottleFlowTest : SysuiTestCase() {

    @Test
    fun doesNotAffectEmissions_whenDelayAtLeastEqualToPeriod() = runTest {
        // Arrange
        val choreographer = createChoreographer(this)
        val output = mutableListOf<Int>()
        val collectJob = backgroundScope.launch {
            flow {
                emit(1)
                delay(1000)
                emit(2)
            }.throttle(1000, choreographer.fakeClock).toList(output)
        }

        // Act
        choreographer.advanceAndRun(0)

        // Assert
        assertThat(output).containsExactly(1)

        // Act
        choreographer.advanceAndRun(999)

        // Assert
        assertThat(output).containsExactly(1)

        // Act
        choreographer.advanceAndRun(1)

        // Assert
        assertThat(output).containsExactly(1, 2)

        // Cleanup
        collectJob.cancel()
    }

    @Test
    fun delaysEmissions_withShorterThanPeriodDelay_untilPeriodElapses() = runTest {
        // Arrange
        val choreographer = createChoreographer(this)
        val output = mutableListOf<Int>()
        val collectJob = backgroundScope.launch {
            flow {
                emit(1)
                delay(500)
                emit(2)
            }.throttle(1000, choreographer.fakeClock).toList(output)
        }

        // Act
        choreographer.advanceAndRun(0)

        // Assert
        assertThat(output).containsExactly(1)

        // Act
        choreographer.advanceAndRun(500)
        choreographer.advanceAndRun(499)

        // Assert
        assertThat(output).containsExactly(1)

        // Act
        choreographer.advanceAndRun(1)

        // Assert
        assertThat(output).containsExactly(1, 2)

        // Cleanup
        collectJob.cancel()
    }

    @Test
    fun filtersAllButLastEmission_whenMultipleEmissionsInPeriod() = runTest {
        // Arrange
        val choreographer = createChoreographer(this)
        val output = mutableListOf<Int>()
        val collectJob = backgroundScope.launch {
            flow {
                emit(1)
                delay(500)
                emit(2)
                delay(500)
                emit(3)
            }.throttle(1000, choreographer.fakeClock).toList(output)
        }

        // Act
        choreographer.advanceAndRun(0)

        // Assert
        assertThat(output).containsExactly(1)

        // Act
        choreographer.advanceAndRun(500)
        choreographer.advanceAndRun(499)

        // Assert
        assertThat(output).containsExactly(1)

        // Act
        choreographer.advanceAndRun(1)

        // Assert
        assertThat(output).containsExactly(1, 3)

        // Cleanup
        collectJob.cancel()
    }

    @Test
    fun filtersAllButLastEmission_andDelaysIt_whenMultipleEmissionsInShorterThanPeriod() = runTest {
        // Arrange
        val choreographer = createChoreographer(this)
        val output = mutableListOf<Int>()
        val collectJob = backgroundScope.launch {
            flow {
                emit(1)
                delay(500)
                emit(2)
                delay(250)
                emit(3)
            }.throttle(1000, choreographer.fakeClock).toList(output)
        }

        // Act
        choreographer.advanceAndRun(0)

        // Assert
        assertThat(output).containsExactly(1)

        // Act
        choreographer.advanceAndRun(500)
        choreographer.advanceAndRun(250)
        choreographer.advanceAndRun(249)

        // Assert
        assertThat(output).containsExactly(1)

        // Act
        choreographer.advanceAndRun(1)

        // Assert
        assertThat(output).containsExactly(1, 3)

        // Cleanup
        collectJob.cancel()
    }

    private fun createChoreographer(testScope: TestScope) = object {
        val fakeClock = FakeSystemClock()

        fun advanceAndRun(millis: Long) {
            fakeClock.advanceTime(millis)
            testScope.advanceTimeBy(millis)
            testScope.runCurrent()
        }
    }
}

private fun <T> assertThatFlow(flow: Flow<T>) =
    object {
        suspend fun emitsExactly(vararg emissions: T) =