Donate to e Foundation | Murena handsets with /e/OS | Own a part of Murena! Learn more

Commit 5b517957 authored by Govinda Wasserman's avatar Govinda Wasserman Committed by Automerger Merge Worker
Browse files

Merge "Creates throttle function for flows" into tm-qpr-dev am: 93470241

parents fbff9f55 93470241
Loading
Loading
Loading
Loading
+93 −0
Original line number Diff line number Diff line
@@ -16,14 +16,21 @@

package com.android.systemui.util.kotlin

import com.android.systemui.util.time.SystemClock
import com.android.systemui.util.time.SystemClockImpl
import kotlinx.coroutines.CoroutineStart
import java.util.concurrent.atomic.AtomicReference
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.Job
import kotlinx.coroutines.coroutineScope
import kotlinx.coroutines.delay
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.channelFlow
import kotlinx.coroutines.flow.distinctUntilChanged
import kotlinx.coroutines.flow.flow
import kotlinx.coroutines.flow.onStart
import kotlinx.coroutines.launch
import kotlin.math.max

/**
 * Returns a new [Flow] that combines the two most recent emissions from [this] using [transform].
@@ -167,3 +174,89 @@ fun <A, B, C> Flow<A>.sample(other: Flow<B>, transform: suspend (A, B) -> C): Fl
 * Note that the returned Flow will not emit anything until [other] has emitted at least one value.
 */
fun <A> Flow<*>.sample(other: Flow<A>): Flow<A> = sample(other) { _, a -> a }

/**
 * Returns a flow that mirrors the original flow, but delays values following emitted values for the
 * given [periodMs]. If the original flow emits more than one value during this period, only the
 * latest value is emitted.
 *
 * Example:
 *
 * ```kotlin
 * flow {
 *     emit(1)     // t=0ms
 *     delay(90)
 *     emit(2)     // t=90ms
 *     delay(90)
 *     emit(3)     // t=180ms
 *     delay(1010)
 *     emit(4)     // t=1190ms
 *     delay(1010)
 *     emit(5)     // t=2200ms
 * }.throttle(1000)
 * ```
 *
 * produces the following emissions at the following times
 *
 * ```text
 * 1 (t=0ms), 3 (t=1000ms), 4 (t=2000ms), 5 (t=3000ms)
 * ```
 */
fun <T> Flow<T>.throttle(periodMs: Long): Flow<T> = this.throttle(periodMs, SystemClockImpl())

/**
 * Returns a flow that mirrors the original flow, but delays values following emitted values for the
 * given [periodMs] as reported by the given [clock]. If the original flow emits more than one value
 * during this period, only The latest value is emitted.
 *
 * Example:
 *
 * ```kotlin
 * flow {
 *     emit(1)     // t=0ms
 *     delay(90)
 *     emit(2)     // t=90ms
 *     delay(90)
 *     emit(3)     // t=180ms
 *     delay(1010)
 *     emit(4)     // t=1190ms
 *     delay(1010)
 *     emit(5)     // t=2200ms
 * }.throttle(1000)
 * ```
 *
 * produces the following emissions at the following times
 *
 * ```text
 * 1 (t=0ms), 3 (t=1000ms), 4 (t=2000ms), 5 (t=3000ms)
 * ```
 */
fun <T> Flow<T>.throttle(periodMs: Long, clock: SystemClock): Flow<T> = channelFlow {
    coroutineScope {
        var previousEmitTimeMs = 0L
        var delayJob: Job? = null
        var sendJob: Job? = null
        val outerScope = this

        collect {
            delayJob?.cancel()
            sendJob?.join()
            val currentTimeMs = clock.elapsedRealtime()
            val timeSinceLastEmit = currentTimeMs - previousEmitTimeMs
            val timeUntilNextEmit = max(0L, periodMs - timeSinceLastEmit)
            if (timeUntilNextEmit > 0L) {
                // We create delayJob to allow cancellation during the delay period
                delayJob = launch {
                    delay(timeUntilNextEmit)
                    sendJob = outerScope.launch(start = CoroutineStart.UNDISPATCHED) {
                        send(it)
                        previousEmitTimeMs = clock.elapsedRealtime()
                    }
                }
            } else {
                send(it)
                previousEmitTimeMs = currentTimeMs
            }
        }
    }
}
 No newline at end of file
+171 −0
Original line number Diff line number Diff line
@@ -19,9 +19,12 @@ package com.android.systemui.util.kotlin
import android.testing.AndroidTestingRunner
import androidx.test.filters.SmallTest
import com.android.systemui.SysuiTestCase
import com.android.systemui.util.time.FakeSystemClock
import com.google.common.truth.Truth.assertThat
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.ExperimentalCoroutinesApi
import kotlinx.coroutines.Job
import kotlinx.coroutines.delay
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.MutableSharedFlow
import kotlinx.coroutines.flow.MutableStateFlow
@@ -35,6 +38,10 @@ import kotlinx.coroutines.flow.takeWhile
import kotlinx.coroutines.flow.toList
import kotlinx.coroutines.launch
import kotlinx.coroutines.runBlocking
import kotlinx.coroutines.test.TestScope
import kotlinx.coroutines.test.advanceTimeBy
import kotlinx.coroutines.test.runCurrent
import kotlinx.coroutines.test.runTest
import kotlinx.coroutines.yield
import org.junit.Test
import org.junit.runner.RunWith
@@ -231,6 +238,170 @@ class SampleFlowTest : SysuiTestCase() {
    }
}

@OptIn(ExperimentalCoroutinesApi::class)
@SmallTest
@RunWith(AndroidTestingRunner::class)
class ThrottleFlowTest : SysuiTestCase() {

    @Test
    fun doesNotAffectEmissions_whenDelayAtLeastEqualToPeriod() = runTest {
        // Arrange
        val choreographer = createChoreographer(this)
        val output = mutableListOf<Int>()
        val collectJob = backgroundScope.launch {
            flow {
                emit(1)
                delay(1000)
                emit(2)
            }.throttle(1000, choreographer.fakeClock).toList(output)
        }

        // Act
        choreographer.advanceAndRun(0)

        // Assert
        assertThat(output).containsExactly(1)

        // Act
        choreographer.advanceAndRun(999)

        // Assert
        assertThat(output).containsExactly(1)

        // Act
        choreographer.advanceAndRun(1)

        // Assert
        assertThat(output).containsExactly(1, 2)

        // Cleanup
        collectJob.cancel()
    }

    @Test
    fun delaysEmissions_withShorterThanPeriodDelay_untilPeriodElapses() = runTest {
        // Arrange
        val choreographer = createChoreographer(this)
        val output = mutableListOf<Int>()
        val collectJob = backgroundScope.launch {
            flow {
                emit(1)
                delay(500)
                emit(2)
            }.throttle(1000, choreographer.fakeClock).toList(output)
        }

        // Act
        choreographer.advanceAndRun(0)

        // Assert
        assertThat(output).containsExactly(1)

        // Act
        choreographer.advanceAndRun(500)
        choreographer.advanceAndRun(499)

        // Assert
        assertThat(output).containsExactly(1)

        // Act
        choreographer.advanceAndRun(1)

        // Assert
        assertThat(output).containsExactly(1, 2)

        // Cleanup
        collectJob.cancel()
    }

    @Test
    fun filtersAllButLastEmission_whenMultipleEmissionsInPeriod() = runTest {
        // Arrange
        val choreographer = createChoreographer(this)
        val output = mutableListOf<Int>()
        val collectJob = backgroundScope.launch {
            flow {
                emit(1)
                delay(500)
                emit(2)
                delay(500)
                emit(3)
            }.throttle(1000, choreographer.fakeClock).toList(output)
        }

        // Act
        choreographer.advanceAndRun(0)

        // Assert
        assertThat(output).containsExactly(1)

        // Act
        choreographer.advanceAndRun(500)
        choreographer.advanceAndRun(499)

        // Assert
        assertThat(output).containsExactly(1)

        // Act
        choreographer.advanceAndRun(1)

        // Assert
        assertThat(output).containsExactly(1, 3)

        // Cleanup
        collectJob.cancel()
    }

    @Test
    fun filtersAllButLastEmission_andDelaysIt_whenMultipleEmissionsInShorterThanPeriod() = runTest {
        // Arrange
        val choreographer = createChoreographer(this)
        val output = mutableListOf<Int>()
        val collectJob = backgroundScope.launch {
            flow {
                emit(1)
                delay(500)
                emit(2)
                delay(250)
                emit(3)
            }.throttle(1000, choreographer.fakeClock).toList(output)
        }

        // Act
        choreographer.advanceAndRun(0)

        // Assert
        assertThat(output).containsExactly(1)

        // Act
        choreographer.advanceAndRun(500)
        choreographer.advanceAndRun(250)
        choreographer.advanceAndRun(249)

        // Assert
        assertThat(output).containsExactly(1)

        // Act
        choreographer.advanceAndRun(1)

        // Assert
        assertThat(output).containsExactly(1, 3)

        // Cleanup
        collectJob.cancel()
    }

    private fun createChoreographer(testScope: TestScope) = object {
        val fakeClock = FakeSystemClock()

        fun advanceAndRun(millis: Long) {
            fakeClock.advanceTime(millis)
            testScope.advanceTimeBy(millis)
            testScope.runCurrent()
        }
    }
}

private fun <T> assertThatFlow(flow: Flow<T>) =
    object {
        suspend fun emitsExactly(vararg emissions: T) =