Donate to e Foundation | Murena handsets with /e/OS | Own a part of Murena! Learn more

Commit 93470241 authored by Govinda Wasserman's avatar Govinda Wasserman Committed by Android (Google) Code Review
Browse files

Merge "Creates throttle function for flows" into tm-qpr-dev

parents 94eb9b76 6f4029a5
Loading
Loading
Loading
Loading
+93 −0
Original line number Original line Diff line number Diff line
@@ -16,14 +16,21 @@


package com.android.systemui.util.kotlin
package com.android.systemui.util.kotlin


import com.android.systemui.util.time.SystemClock
import com.android.systemui.util.time.SystemClockImpl
import kotlinx.coroutines.CoroutineStart
import java.util.concurrent.atomic.AtomicReference
import java.util.concurrent.atomic.AtomicReference
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.Job
import kotlinx.coroutines.coroutineScope
import kotlinx.coroutines.coroutineScope
import kotlinx.coroutines.delay
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.channelFlow
import kotlinx.coroutines.flow.distinctUntilChanged
import kotlinx.coroutines.flow.distinctUntilChanged
import kotlinx.coroutines.flow.flow
import kotlinx.coroutines.flow.flow
import kotlinx.coroutines.flow.onStart
import kotlinx.coroutines.flow.onStart
import kotlinx.coroutines.launch
import kotlinx.coroutines.launch
import kotlin.math.max


/**
/**
 * Returns a new [Flow] that combines the two most recent emissions from [this] using [transform].
 * Returns a new [Flow] that combines the two most recent emissions from [this] using [transform].
@@ -167,3 +174,89 @@ fun <A, B, C> Flow<A>.sample(other: Flow<B>, transform: suspend (A, B) -> C): Fl
 * Note that the returned Flow will not emit anything until [other] has emitted at least one value.
 * Note that the returned Flow will not emit anything until [other] has emitted at least one value.
 */
 */
fun <A> Flow<*>.sample(other: Flow<A>): Flow<A> = sample(other) { _, a -> a }
fun <A> Flow<*>.sample(other: Flow<A>): Flow<A> = sample(other) { _, a -> a }

/**
 * Returns a flow that mirrors the original flow, but delays values following emitted values for the
 * given [periodMs]. If the original flow emits more than one value during this period, only the
 * latest value is emitted.
 *
 * Example:
 *
 * ```kotlin
 * flow {
 *     emit(1)     // t=0ms
 *     delay(90)
 *     emit(2)     // t=90ms
 *     delay(90)
 *     emit(3)     // t=180ms
 *     delay(1010)
 *     emit(4)     // t=1190ms
 *     delay(1010)
 *     emit(5)     // t=2200ms
 * }.throttle(1000)
 * ```
 *
 * produces the following emissions at the following times
 *
 * ```text
 * 1 (t=0ms), 3 (t=1000ms), 4 (t=2000ms), 5 (t=3000ms)
 * ```
 */
fun <T> Flow<T>.throttle(periodMs: Long): Flow<T> = this.throttle(periodMs, SystemClockImpl())

/**
 * Returns a flow that mirrors the original flow, but delays values following emitted values for the
 * given [periodMs] as reported by the given [clock]. If the original flow emits more than one value
 * during this period, only The latest value is emitted.
 *
 * Example:
 *
 * ```kotlin
 * flow {
 *     emit(1)     // t=0ms
 *     delay(90)
 *     emit(2)     // t=90ms
 *     delay(90)
 *     emit(3)     // t=180ms
 *     delay(1010)
 *     emit(4)     // t=1190ms
 *     delay(1010)
 *     emit(5)     // t=2200ms
 * }.throttle(1000)
 * ```
 *
 * produces the following emissions at the following times
 *
 * ```text
 * 1 (t=0ms), 3 (t=1000ms), 4 (t=2000ms), 5 (t=3000ms)
 * ```
 */
fun <T> Flow<T>.throttle(periodMs: Long, clock: SystemClock): Flow<T> = channelFlow {
    coroutineScope {
        var previousEmitTimeMs = 0L
        var delayJob: Job? = null
        var sendJob: Job? = null
        val outerScope = this

        collect {
            delayJob?.cancel()
            sendJob?.join()
            val currentTimeMs = clock.elapsedRealtime()
            val timeSinceLastEmit = currentTimeMs - previousEmitTimeMs
            val timeUntilNextEmit = max(0L, periodMs - timeSinceLastEmit)
            if (timeUntilNextEmit > 0L) {
                // We create delayJob to allow cancellation during the delay period
                delayJob = launch {
                    delay(timeUntilNextEmit)
                    sendJob = outerScope.launch(start = CoroutineStart.UNDISPATCHED) {
                        send(it)
                        previousEmitTimeMs = clock.elapsedRealtime()
                    }
                }
            } else {
                send(it)
                previousEmitTimeMs = currentTimeMs
            }
        }
    }
}
 No newline at end of file
+171 −0
Original line number Original line Diff line number Diff line
@@ -19,9 +19,12 @@ package com.android.systemui.util.kotlin
import android.testing.AndroidTestingRunner
import android.testing.AndroidTestingRunner
import androidx.test.filters.SmallTest
import androidx.test.filters.SmallTest
import com.android.systemui.SysuiTestCase
import com.android.systemui.SysuiTestCase
import com.android.systemui.util.time.FakeSystemClock
import com.google.common.truth.Truth.assertThat
import com.google.common.truth.Truth.assertThat
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.ExperimentalCoroutinesApi
import kotlinx.coroutines.Job
import kotlinx.coroutines.Job
import kotlinx.coroutines.delay
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.MutableSharedFlow
import kotlinx.coroutines.flow.MutableSharedFlow
import kotlinx.coroutines.flow.MutableStateFlow
import kotlinx.coroutines.flow.MutableStateFlow
@@ -35,6 +38,10 @@ import kotlinx.coroutines.flow.takeWhile
import kotlinx.coroutines.flow.toList
import kotlinx.coroutines.flow.toList
import kotlinx.coroutines.launch
import kotlinx.coroutines.launch
import kotlinx.coroutines.runBlocking
import kotlinx.coroutines.runBlocking
import kotlinx.coroutines.test.TestScope
import kotlinx.coroutines.test.advanceTimeBy
import kotlinx.coroutines.test.runCurrent
import kotlinx.coroutines.test.runTest
import kotlinx.coroutines.yield
import kotlinx.coroutines.yield
import org.junit.Test
import org.junit.Test
import org.junit.runner.RunWith
import org.junit.runner.RunWith
@@ -231,6 +238,170 @@ class SampleFlowTest : SysuiTestCase() {
    }
    }
}
}


@OptIn(ExperimentalCoroutinesApi::class)
@SmallTest
@RunWith(AndroidTestingRunner::class)
class ThrottleFlowTest : SysuiTestCase() {

    @Test
    fun doesNotAffectEmissions_whenDelayAtLeastEqualToPeriod() = runTest {
        // Arrange
        val choreographer = createChoreographer(this)
        val output = mutableListOf<Int>()
        val collectJob = backgroundScope.launch {
            flow {
                emit(1)
                delay(1000)
                emit(2)
            }.throttle(1000, choreographer.fakeClock).toList(output)
        }

        // Act
        choreographer.advanceAndRun(0)

        // Assert
        assertThat(output).containsExactly(1)

        // Act
        choreographer.advanceAndRun(999)

        // Assert
        assertThat(output).containsExactly(1)

        // Act
        choreographer.advanceAndRun(1)

        // Assert
        assertThat(output).containsExactly(1, 2)

        // Cleanup
        collectJob.cancel()
    }

    @Test
    fun delaysEmissions_withShorterThanPeriodDelay_untilPeriodElapses() = runTest {
        // Arrange
        val choreographer = createChoreographer(this)
        val output = mutableListOf<Int>()
        val collectJob = backgroundScope.launch {
            flow {
                emit(1)
                delay(500)
                emit(2)
            }.throttle(1000, choreographer.fakeClock).toList(output)
        }

        // Act
        choreographer.advanceAndRun(0)

        // Assert
        assertThat(output).containsExactly(1)

        // Act
        choreographer.advanceAndRun(500)
        choreographer.advanceAndRun(499)

        // Assert
        assertThat(output).containsExactly(1)

        // Act
        choreographer.advanceAndRun(1)

        // Assert
        assertThat(output).containsExactly(1, 2)

        // Cleanup
        collectJob.cancel()
    }

    @Test
    fun filtersAllButLastEmission_whenMultipleEmissionsInPeriod() = runTest {
        // Arrange
        val choreographer = createChoreographer(this)
        val output = mutableListOf<Int>()
        val collectJob = backgroundScope.launch {
            flow {
                emit(1)
                delay(500)
                emit(2)
                delay(500)
                emit(3)
            }.throttle(1000, choreographer.fakeClock).toList(output)
        }

        // Act
        choreographer.advanceAndRun(0)

        // Assert
        assertThat(output).containsExactly(1)

        // Act
        choreographer.advanceAndRun(500)
        choreographer.advanceAndRun(499)

        // Assert
        assertThat(output).containsExactly(1)

        // Act
        choreographer.advanceAndRun(1)

        // Assert
        assertThat(output).containsExactly(1, 3)

        // Cleanup
        collectJob.cancel()
    }

    @Test
    fun filtersAllButLastEmission_andDelaysIt_whenMultipleEmissionsInShorterThanPeriod() = runTest {
        // Arrange
        val choreographer = createChoreographer(this)
        val output = mutableListOf<Int>()
        val collectJob = backgroundScope.launch {
            flow {
                emit(1)
                delay(500)
                emit(2)
                delay(250)
                emit(3)
            }.throttle(1000, choreographer.fakeClock).toList(output)
        }

        // Act
        choreographer.advanceAndRun(0)

        // Assert
        assertThat(output).containsExactly(1)

        // Act
        choreographer.advanceAndRun(500)
        choreographer.advanceAndRun(250)
        choreographer.advanceAndRun(249)

        // Assert
        assertThat(output).containsExactly(1)

        // Act
        choreographer.advanceAndRun(1)

        // Assert
        assertThat(output).containsExactly(1, 3)

        // Cleanup
        collectJob.cancel()
    }

    private fun createChoreographer(testScope: TestScope) = object {
        val fakeClock = FakeSystemClock()

        fun advanceAndRun(millis: Long) {
            fakeClock.advanceTime(millis)
            testScope.advanceTimeBy(millis)
            testScope.runCurrent()
        }
    }
}

private fun <T> assertThatFlow(flow: Flow<T>) =
private fun <T> assertThatFlow(flow: Flow<T>) =
    object {
    object {
        suspend fun emitsExactly(vararg emissions: T) =
        suspend fun emitsExactly(vararg emissions: T) =