Donate to e Foundation | Murena handsets with /e/OS | Own a part of Murena! Learn more

Commit e8fc27c1 authored by Lucas Silva's avatar Lucas Silva Committed by Android (Google) Code Review
Browse files

Merge "Implement sliding window flow helper" into main

parents cd46292a e5c46767
Loading
Loading
Loading
Loading
+58 −0
Original line number Diff line number Diff line
@@ -19,8 +19,11 @@ package com.android.systemui.util.kotlin
import com.android.app.tracing.coroutines.launchTraced as launch
import com.android.systemui.util.time.SystemClock
import com.android.systemui.util.time.SystemClockImpl
import java.util.LinkedList
import java.util.concurrent.atomic.AtomicReference
import kotlin.math.max
import kotlin.time.Duration
import kotlin.time.Duration.Companion.milliseconds
import kotlinx.coroutines.CoroutineStart
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.Job
@@ -364,3 +367,58 @@ inline fun <T1, T2, T3, T4, T5, T6, T7, T8, T9, R> combine(
 */
@Suppress("NOTHING_TO_INLINE")
inline fun Flow<Unit>.emitOnStart(): Flow<Unit> = onStart { emit(Unit) }

/**
 * Transforms a Flow<T> into a Flow<List<T>> by implementing a sliding window algorithm.
 *
 * This function creates a sliding window over the input Flow<T>. The window has a specified
 * [windowDuration] and slides continuously as time progresses. The emitted List<T> contains all
 * items from the input flow that fall within the current window.
 *
 * The window slides forward by the smallest possible increment to include or exclude *one* event
 * based on the time the event was emitted (determined by the System.currentTimeMillis()). This
 * means that consecutive emitted lists will have overlapping elements if the elements fall within
 * the [windowDuration]
 *
 * @param windowDuration The duration of the sliding window.
 * @return A Flow that emits Lists of elements within the current sliding window.
 */
fun <T> Flow<T>.slidingWindow(
    windowDuration: Duration,
    clock: SystemClock = SystemClockImpl(),
): Flow<List<T>> = channelFlow {
    require(windowDuration.isPositive()) { "Window duration must be positive" }
    val buffer = LinkedList<Pair<Duration, T>>()

    coroutineScope {
        var windowAdvancementJob: Job? = null

        collect { value ->
            windowAdvancementJob?.cancel()
            val now = clock.currentTimeMillis().milliseconds
            buffer.addLast(now to value)

            while (buffer.isNotEmpty() && buffer.first.first + windowDuration <= now) {
                buffer.removeFirst()
            }
            send(buffer.map { it.second })

            // Keep the window advancing through time even if the source flow isn't emitting
            // anymore. We stop advancing the window as soon as there are no items left in the
            // buffer.
            windowAdvancementJob = launch {
                while (buffer.isNotEmpty()) {
                    val startOfWindow = clock.currentTimeMillis().milliseconds - windowDuration
                    // Invariant: At this point, everything in the buffer is guaranteed to be in
                    // the window, as we removed expired items above.
                    val timeUntilNextOldest =
                        (buffer.first.first - startOfWindow).coerceAtLeast(0.milliseconds)
                    delay(timeUntilNextOldest)
                    // Remove the oldest item, as it has now fallen out of the window.
                    buffer.removeFirst()
                    send(buffer.map { it.second })
                }
            }
        }
    }
}
+261 −36
Original line number Diff line number Diff line
@@ -21,6 +21,7 @@ import androidx.test.filters.SmallTest
import com.android.systemui.SysuiTestCase
import com.android.systemui.util.time.FakeSystemClock
import com.google.common.truth.Truth.assertThat
import kotlin.time.Duration.Companion.milliseconds
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.Job
import kotlinx.coroutines.delay
@@ -33,6 +34,7 @@ import kotlinx.coroutines.flow.filterIsInstance
import kotlinx.coroutines.flow.flow
import kotlinx.coroutines.flow.flowOf
import kotlinx.coroutines.flow.merge
import kotlinx.coroutines.flow.onEach
import kotlinx.coroutines.flow.takeWhile
import kotlinx.coroutines.flow.toList
import kotlinx.coroutines.launch
@@ -50,11 +52,7 @@ import org.junit.runner.RunWith
class PairwiseFlowTest : SysuiTestCase() {
    @Test
    fun simple() = runBlocking {
        assertThatFlow((1..3).asFlow().pairwise())
            .emitsExactly(
                WithPrev(1, 2),
                WithPrev(2, 3),
            )
        assertThatFlow((1..3).asFlow().pairwise()).emitsExactly(WithPrev(1, 2), WithPrev(2, 3))
    }

    @Test fun notEnough() = runBlocking { assertThatFlow(flowOf(1).pairwise()).emitsNothing() }
@@ -157,48 +155,27 @@ class SetChangesFlowTest : SysuiTestCase() {
    fun simple() = runBlocking {
        assertThatFlow(flowOf(setOf(1, 2, 3), setOf(2, 3, 4)).setChanges())
            .emitsExactly(
                SetChanges(
                    added = setOf(1, 2, 3),
                    removed = emptySet(),
                ),
                SetChanges(
                    added = setOf(4),
                    removed = setOf(1),
                ),
                SetChanges(added = setOf(1, 2, 3), removed = emptySet()),
                SetChanges(added = setOf(4), removed = setOf(1)),
            )
    }

    @Test
    fun onlyOneEmission() = runBlocking {
        assertThatFlow(flowOf(setOf(1)).setChanges())
            .emitsExactly(
                SetChanges(
                    added = setOf(1),
                    removed = emptySet(),
                )
            )
            .emitsExactly(SetChanges(added = setOf(1), removed = emptySet()))
    }

    @Test
    fun fromEmptySet() = runBlocking {
        assertThatFlow(flowOf(emptySet(), setOf(1, 2)).setChanges())
            .emitsExactly(
                SetChanges(
                    removed = emptySet(),
                    added = setOf(1, 2),
                )
            )
            .emitsExactly(SetChanges(removed = emptySet(), added = setOf(1, 2)))
    }

    @Test
    fun dontEmitFirstEvent() = runBlocking {
        assertThatFlow(flowOf(setOf(1, 2), setOf(2, 3)).setChanges(emitFirstEvent = false))
            .emitsExactly(
                SetChanges(
                    removed = setOf(1),
                    added = setOf(3),
                )
            )
            .emitsExactly(SetChanges(removed = setOf(1), added = setOf(3)))
    }
}

@@ -235,11 +212,7 @@ class SampleFlowTest : SysuiTestCase() {
            emit(4)
        }
        assertThatFlow(sampler.sample(samplee) { a, b -> a to b })
            .emitsExactly(
                2 to 1,
                3 to 3,
                4 to 3,
            )
            .emitsExactly(2 to 1, 3 to 3, 4 to 3)
    }
}

@@ -419,10 +392,262 @@ class ThrottleFlowTest : SysuiTestCase() {
        }
}

@SmallTest
@RunWith(AndroidJUnit4::class)
class SlidingWindowFlowTest : SysuiTestCase() {

    @Test
    fun basicWindowing() = runTest {
        val choreographer = createChoreographer(this)
        val output = mutableListOf<List<Int>>()
        val collectJob =
            backgroundScope.launch {
                (1..5)
                    .asFlow()
                    .onEach { delay(100) }
                    .slidingWindow(300.milliseconds, choreographer.fakeClock)
                    .toList(output)
            }

        choreographer.advanceAndRun(0)
        assertThat(output).isEmpty()

        choreographer.advanceAndRun(100)
        assertThat(output).containsExactly(listOf(1))

        choreographer.advanceAndRun(1)
        assertThat(output).containsExactly(listOf(1))

        choreographer.advanceAndRun(99)
        assertThat(output).containsExactly(listOf(1), listOf(1, 2))

        choreographer.advanceAndRun(100)
        assertThat(output).containsExactly(listOf(1), listOf(1, 2), listOf(1, 2, 3))

        choreographer.advanceAndRun(100)
        assertThat(output)
            .containsExactly(listOf(1), listOf(1, 2), listOf(1, 2, 3), listOf(2, 3, 4))

        choreographer.advanceAndRun(100)
        assertThat(output)
            .containsExactly(
                listOf(1),
                listOf(1, 2),
                listOf(1, 2, 3),
                listOf(2, 3, 4),
                listOf(3, 4, 5),
            )

        choreographer.advanceAndRun(100)
        assertThat(output)
            .containsExactly(
                listOf(1),
                listOf(1, 2),
                listOf(1, 2, 3),
                listOf(2, 3, 4),
                listOf(3, 4, 5),
                listOf(4, 5),
            )

        choreographer.advanceAndRun(100)
        assertThat(output)
            .containsExactly(
                listOf(1),
                listOf(1, 2),
                listOf(1, 2, 3),
                listOf(2, 3, 4),
                listOf(3, 4, 5),
                listOf(4, 5),
                listOf(5),
            )

        choreographer.advanceAndRun(100)
        assertThat(output)
            .containsExactly(
                listOf(1),
                listOf(1, 2),
                listOf(1, 2, 3),
                listOf(2, 3, 4),
                listOf(3, 4, 5),
                listOf(4, 5),
                listOf(5),
                emptyList<Int>(),
            )

        // Verify no more emissions
        choreographer.advanceAndRun(9999999999)
        assertThat(output)
            .containsExactly(
                listOf(1),
                listOf(1, 2),
                listOf(1, 2, 3),
                listOf(2, 3, 4),
                listOf(3, 4, 5),
                listOf(4, 5),
                listOf(5),
                emptyList<Int>(),
            )

        assertThat(collectJob.isCompleted).isTrue()
    }

    @Test
    fun initialEmptyFlow() = runTest {
        val choreographer = createChoreographer(this)
        val output = mutableListOf<List<Int>>()
        val collectJob =
            backgroundScope.launch {
                flow {
                        delay(200)
                        emit(1)
                    }
                    .slidingWindow(100.milliseconds, choreographer.fakeClock)
                    .toList(output)
            }

        choreographer.advanceAndRun(0)
        assertThat(output).isEmpty()

        choreographer.advanceAndRun(200)
        assertThat(output).containsExactly(listOf(1))

        choreographer.advanceAndRun(100)
        assertThat(output).containsExactly(listOf(1), emptyList<Int>())

        assertThat(collectJob.isCompleted).isTrue()
    }

    @Test
    fun windowLargerThanData() = runTest {
        val choreographer = createChoreographer(this)
        val output = mutableListOf<List<Int>>()
        val collectJob =
            backgroundScope.launch {
                (1..3)
                    .asFlow()
                    .onEach { delay(50) }
                    .slidingWindow(500.milliseconds, choreographer.fakeClock)
                    .toList(output)
            }

        choreographer.advanceAndRun(0)
        assertThat(output).isEmpty()

        choreographer.advanceAndRun(50)
        assertThat(output).containsExactly(listOf(1))

        choreographer.advanceAndRun(50)
        assertThat(output).containsExactly(listOf(1), listOf(1, 2))

        choreographer.advanceAndRun(50)
        assertThat(output).containsExactly(listOf(1), listOf(1, 2), listOf(1, 2, 3))

        // It has been 100ms since the first emission, which means we have 400ms left until the
        // first item is evicted from the window. Ensure that we have no evictions until that time.
        choreographer.advanceAndRun(399)
        assertThat(output).containsExactly(listOf(1), listOf(1, 2), listOf(1, 2, 3))

        choreographer.advanceAndRun(1)
        assertThat(output).containsExactly(listOf(1), listOf(1, 2), listOf(1, 2, 3), listOf(2, 3))

        choreographer.advanceAndRun(50)
        assertThat(output)
            .containsExactly(listOf(1), listOf(1, 2), listOf(1, 2, 3), listOf(2, 3), listOf(3))

        choreographer.advanceAndRun(50)
        assertThat(output)
            .containsExactly(
                listOf(1),
                listOf(1, 2),
                listOf(1, 2, 3),
                listOf(2, 3),
                listOf(3),
                emptyList<Int>(),
            )

        assertThat(collectJob.isCompleted).isTrue()
    }

    @Test
    fun dataGapLargerThanWindow() = runTest {
        val choreographer = createChoreographer(this)
        val output = mutableListOf<List<Int>>()
        val collectJob =
            backgroundScope.launch {
                flow {
                        emit(1)
                        delay(200)
                        emit(2)
                        delay(500) // Gap larger than window
                        emit(3)
                    }
                    .slidingWindow(300.milliseconds, choreographer.fakeClock)
                    .toList(output)
            }

        choreographer.advanceAndRun(0)
        assertThat(output).containsExactly(listOf(1))

        choreographer.advanceAndRun(200)
        assertThat(output).containsExactly(listOf(1), listOf(1, 2))

        choreographer.advanceAndRun(100)
        assertThat(output).containsExactly(listOf(1), listOf(1, 2), listOf(2))

        choreographer.advanceAndRun(200)
        assertThat(output).containsExactly(listOf(1), listOf(1, 2), listOf(2), emptyList<Int>())

        choreographer.advanceAndRun(200)
        assertThat(output)
            .containsExactly(listOf(1), listOf(1, 2), listOf(2), emptyList<Int>(), listOf(3))

        choreographer.advanceAndRun(300)
        assertThat(output)
            .containsExactly(
                listOf(1),
                listOf(1, 2),
                listOf(2),
                emptyList<Int>(),
                listOf(3),
                emptyList<Int>(),
            )

        assertThat(collectJob.isCompleted).isTrue()
    }

    @Test
    fun emptyFlow() = runTest {
        val choreographer = createChoreographer(this)
        val output = mutableListOf<List<Int>>()

        val collectJob =
            backgroundScope.launch {
                emptyFlow<Int>().slidingWindow(100.milliseconds).toList(output)
            }

        choreographer.advanceAndRun(0)
        assertThat(output).isEmpty()

        assertThat(collectJob.isCompleted).isTrue()
    }

    private fun createChoreographer(testScope: TestScope) =
        object {
            val fakeClock = FakeSystemClock()

            fun advanceAndRun(millis: Long) {
                fakeClock.advanceTime(millis)
                testScope.advanceTimeBy(millis)
                testScope.runCurrent()
            }
        }
}

private fun <T> assertThatFlow(flow: Flow<T>) =
    object {
        suspend fun emitsExactly(vararg emissions: T) =
            assertThat(flow.toList()).containsExactly(*emissions).inOrder()

        suspend fun emitsNothing() = assertThat(flow.toList()).isEmpty()
    }