Donate to e Foundation | Murena handsets with /e/OS | Own a part of Murena! Learn more

Commit 54825509 authored by Lucas Silva's avatar Lucas Silva
Browse files

Address NPE exception occuring in slidingFlow helper

Since we never add null values to the buffer, it looks like this might
be happening due to concurrency issues - since the LinkedList buffer
isn't thread-safe and we may be accessing it across threads if the flow
is collected using a multi-threaded dispatcher.

Bug: 410013484
Test: atest SystemUITests:SlidingWindowFlowTest
Flag: EXEMPT bugfix
Change-Id: Ic696d30a3ea351d475f4dfe8e41d2135524003cb
parent 9557e44e
Loading
Loading
Loading
Loading
+3 −0
Original line number Diff line number Diff line
@@ -34,6 +34,7 @@ import com.android.systemui.communal.posturing.domain.interactor.advanceTimeBySl
import com.android.systemui.dock.DockManager
import com.android.systemui.dock.fakeDockManager
import com.android.systemui.kosmos.Kosmos
import com.android.systemui.kosmos.advanceTimeBy
import com.android.systemui.kosmos.collectLastValue
import com.android.systemui.kosmos.runTest
import com.android.systemui.kosmos.useUnconfinedTestDispatcher
@@ -42,6 +43,7 @@ import com.android.systemui.user.data.repository.FakeUserRepository.Companion.MA
import com.android.systemui.user.data.repository.fakeUserRepository
import com.android.systemui.util.settings.fakeSettings
import com.google.common.truth.Truth.assertThat
import kotlin.time.Duration.Companion.milliseconds
import kotlinx.coroutines.runBlocking
import org.junit.Before
import org.junit.Test
@@ -145,6 +147,7 @@ class CommunalAutoOpenInteractorTest : SysuiTestCase() {
                    SuppressionReason.ReasonWhenToAutoShow(FEATURE_AUTO_OPEN or FEATURE_MANUAL_OPEN)
                )

            advanceTimeBy(1.milliseconds)
            posturingRepository.fake.emitPositionState(
                PositionState(
                    stationary = PositionState.StationaryState.Stationary(confidence = 1f),
+54 −15
Original line number Diff line number Diff line
@@ -19,7 +19,6 @@ package com.android.systemui.util.kotlin
import com.android.app.tracing.coroutines.launchTraced as launch
import com.android.systemui.util.time.SystemClock
import com.android.systemui.util.time.SystemClockImpl
import java.util.LinkedList
import java.util.concurrent.atomic.AtomicReference
import kotlin.math.max
import kotlin.time.Duration
@@ -37,6 +36,8 @@ import kotlinx.coroutines.flow.filter
import kotlinx.coroutines.flow.flow
import kotlinx.coroutines.flow.map
import kotlinx.coroutines.flow.onStart
import kotlinx.coroutines.sync.Mutex
import kotlinx.coroutines.sync.withLock

/**
 * Returns a new [Flow] that combines the two most recent emissions from [this] using [transform].
@@ -388,7 +389,11 @@ fun <T> Flow<T>.slidingWindow(
    clock: SystemClock = SystemClockImpl(),
): Flow<List<T>> = channelFlow {
    require(windowDuration.isPositive()) { "Window duration must be positive" }
    val buffer = LinkedList<Pair<Duration, T>>()

    // Use a Mutex to protect access to the buffer in case this flow is collected on a
    // multi-threaded dispatcher.
    val bufferMutex = Mutex()
    val buffer = ArrayDeque<Pair<Duration, T>>()

    coroutineScope {
        var windowAdvancementJob: Job? = null
@@ -396,29 +401,63 @@ fun <T> Flow<T>.slidingWindow(
        collect { value ->
            windowAdvancementJob?.cancel()
            val now = clock.currentTimeMillis().milliseconds

            bufferMutex.withLock {
                buffer.addLast(now to value)

            while (buffer.isNotEmpty() && buffer.first.first + windowDuration <= now) {
                while (buffer.isNotEmpty() && buffer.first().first + windowDuration <= now) {
                    buffer.removeFirst()
                }
                send(buffer.map { it.second })
            }

            // Keep the window advancing through time even if the source flow isn't emitting
            // anymore. We stop advancing the window as soon as there are no items left in the
            // buffer.
            windowAdvancementJob = launch {
                while (buffer.isNotEmpty()) {
                    val startOfWindow = clock.currentTimeMillis().milliseconds - windowDuration
                    // Invariant: At this point, everything in the buffer is guaranteed to be in
                    // the window, as we removed expired items above.
                    val timeUntilNextOldest =
                        (buffer.first.first - startOfWindow).coerceAtLeast(0.milliseconds)
                while (true) {
                    // Acquire lock to check buffer state and calculate delay
                    val timeUntilNextOldest: Duration? =
                        bufferMutex.withLock {
                            // If buffer is empty, the job is done
                            if (buffer.isEmpty()) {
                                return@withLock null
                            }

                            // Calculate how long until the oldest element expires
                            val nowMillis = clock.currentTimeMillis().milliseconds
                            val oldestElementTime = buffer.first().first
                            val windowStartTime = nowMillis - windowDuration

                            // Time until the oldest element falls out of the window
                            (oldestElementTime - windowStartTime).coerceAtLeast(Duration.ZERO)
                        }

                    if (timeUntilNextOldest == null) {
                        break
                    }

                    // Delay until the oldest item is *supposed* to expire
                    delay(timeUntilNextOldest)
                    // Remove the oldest item, as it has now fallen out of the window.

                    // Acquire lock again to remove the expired item (if it's still the oldest)
                    // and send the updated buffer state
                    bufferMutex.withLock {
                        val nowMillis = clock.currentTimeMillis().milliseconds
                        var removed = false
                        while (
                            buffer.isNotEmpty() &&
                                buffer.first().first + windowDuration <= nowMillis
                        ) {
                            buffer.removeFirst()
                            removed = true
                        }
                        if (removed) {
                            send(buffer.map { it.second })
                        }
                    }
                }
            }
        }
    }
}