Donate to e Foundation | Murena handsets with /e/OS | Own a part of Murena! Learn more

Commit 35b9b241 authored by Treehugger Robot's avatar Treehugger Robot Committed by Android (Google) Code Review
Browse files

Merge "Address NPE exception occuring in slidingFlow helper" into main

parents 73a6c605 54825509
Loading
Loading
Loading
Loading
+3 −0
Original line number Diff line number Diff line
@@ -34,6 +34,7 @@ import com.android.systemui.communal.posturing.domain.interactor.advanceTimeBySl
import com.android.systemui.dock.DockManager
import com.android.systemui.dock.fakeDockManager
import com.android.systemui.kosmos.Kosmos
import com.android.systemui.kosmos.advanceTimeBy
import com.android.systemui.kosmos.collectLastValue
import com.android.systemui.kosmos.runTest
import com.android.systemui.kosmos.useUnconfinedTestDispatcher
@@ -42,6 +43,7 @@ import com.android.systemui.user.data.repository.FakeUserRepository.Companion.MA
import com.android.systemui.user.data.repository.fakeUserRepository
import com.android.systemui.util.settings.fakeSettings
import com.google.common.truth.Truth.assertThat
import kotlin.time.Duration.Companion.milliseconds
import kotlinx.coroutines.runBlocking
import org.junit.Before
import org.junit.Test
@@ -145,6 +147,7 @@ class CommunalAutoOpenInteractorTest : SysuiTestCase() {
                    SuppressionReason.ReasonWhenToAutoShow(FEATURE_AUTO_OPEN or FEATURE_MANUAL_OPEN)
                )

            advanceTimeBy(1.milliseconds)
            posturingRepository.fake.emitPositionState(
                PositionState(
                    stationary = PositionState.StationaryState.Stationary(confidence = 1f),
+54 −15
Original line number Diff line number Diff line
@@ -19,7 +19,6 @@ package com.android.systemui.util.kotlin
import com.android.app.tracing.coroutines.launchTraced as launch
import com.android.systemui.util.time.SystemClock
import com.android.systemui.util.time.SystemClockImpl
import java.util.LinkedList
import java.util.concurrent.atomic.AtomicReference
import kotlin.math.max
import kotlin.time.Duration
@@ -37,6 +36,8 @@ import kotlinx.coroutines.flow.filter
import kotlinx.coroutines.flow.flow
import kotlinx.coroutines.flow.map
import kotlinx.coroutines.flow.onStart
import kotlinx.coroutines.sync.Mutex
import kotlinx.coroutines.sync.withLock

/**
 * Returns a new [Flow] that combines the two most recent emissions from [this] using [transform].
@@ -388,7 +389,11 @@ fun <T> Flow<T>.slidingWindow(
    clock: SystemClock = SystemClockImpl(),
): Flow<List<T>> = channelFlow {
    require(windowDuration.isPositive()) { "Window duration must be positive" }
    val buffer = LinkedList<Pair<Duration, T>>()

    // Use a Mutex to protect access to the buffer in case this flow is collected on a
    // multi-threaded dispatcher.
    val bufferMutex = Mutex()
    val buffer = ArrayDeque<Pair<Duration, T>>()

    coroutineScope {
        var windowAdvancementJob: Job? = null
@@ -396,29 +401,63 @@ fun <T> Flow<T>.slidingWindow(
        collect { value ->
            windowAdvancementJob?.cancel()
            val now = clock.currentTimeMillis().milliseconds

            bufferMutex.withLock {
                buffer.addLast(now to value)

            while (buffer.isNotEmpty() && buffer.first.first + windowDuration <= now) {
                while (buffer.isNotEmpty() && buffer.first().first + windowDuration <= now) {
                    buffer.removeFirst()
                }
                send(buffer.map { it.second })
            }

            // Keep the window advancing through time even if the source flow isn't emitting
            // anymore. We stop advancing the window as soon as there are no items left in the
            // buffer.
            windowAdvancementJob = launch {
                while (buffer.isNotEmpty()) {
                    val startOfWindow = clock.currentTimeMillis().milliseconds - windowDuration
                    // Invariant: At this point, everything in the buffer is guaranteed to be in
                    // the window, as we removed expired items above.
                    val timeUntilNextOldest =
                        (buffer.first.first - startOfWindow).coerceAtLeast(0.milliseconds)
                while (true) {
                    // Acquire lock to check buffer state and calculate delay
                    val timeUntilNextOldest: Duration? =
                        bufferMutex.withLock {
                            // If buffer is empty, the job is done
                            if (buffer.isEmpty()) {
                                return@withLock null
                            }

                            // Calculate how long until the oldest element expires
                            val nowMillis = clock.currentTimeMillis().milliseconds
                            val oldestElementTime = buffer.first().first
                            val windowStartTime = nowMillis - windowDuration

                            // Time until the oldest element falls out of the window
                            (oldestElementTime - windowStartTime).coerceAtLeast(Duration.ZERO)
                        }

                    if (timeUntilNextOldest == null) {
                        break
                    }

                    // Delay until the oldest item is *supposed* to expire
                    delay(timeUntilNextOldest)
                    // Remove the oldest item, as it has now fallen out of the window.

                    // Acquire lock again to remove the expired item (if it's still the oldest)
                    // and send the updated buffer state
                    bufferMutex.withLock {
                        val nowMillis = clock.currentTimeMillis().milliseconds
                        var removed = false
                        while (
                            buffer.isNotEmpty() &&
                                buffer.first().first + windowDuration <= nowMillis
                        ) {
                            buffer.removeFirst()
                            removed = true
                        }
                        if (removed) {
                            send(buffer.map { it.second })
                        }
                    }
                }
            }
        }
    }
}