Donate to e Foundation | Murena handsets with /e/OS | Own a part of Murena! Learn more

Commit 9546304e authored by Evan Laird's avatar Evan Laird
Browse files

[Sat] Remove satellite-allowed polling mechanism in favor of callback

Now that there is a callback
(SatelliteManager.registerForCommunicationAllowedStateChanged), and
this callback fires once upon registration, we should be able to replace
the entire polling mechanism with the callback.

We'll still have the same general structure as before, where we wait
to know that satellite is supported and then we'll register with the
underlying service.

Note that we use a Lazily shared strategy here, since I think this is a
better tradeoff such that we're not un- and re- registering too often,
but we also never miss a callback.

Test: DeviceBasedSatelliteRepositoryImplTest
Bug: 348675732
Flag: com.android.internal.telephony.flags.oem_enabled_satellite_flag
Change-Id: I900b6583f7e655c6b441eee57ce298e0ef3e02dd
parent dcd7eb59
Loading
Loading
Loading
Loading
+34 −67
Original line number Diff line number Diff line
@@ -20,6 +20,7 @@ import android.os.OutcomeReceiver
import android.telephony.TelephonyCallback
import android.telephony.TelephonyManager
import android.telephony.satellite.NtnSignalStrengthCallback
import android.telephony.satellite.SatelliteCommunicationAllowedStateCallback
import android.telephony.satellite.SatelliteManager
import android.telephony.satellite.SatelliteManager.SATELLITE_RESULT_SUCCESS
import android.telephony.satellite.SatelliteModemStateCallback
@@ -37,7 +38,6 @@ import com.android.systemui.log.core.MessagePrinter
import com.android.systemui.statusbar.pipeline.dagger.DeviceBasedSatelliteInputLog
import com.android.systemui.statusbar.pipeline.dagger.VerboseDeviceBasedSatelliteInputLog
import com.android.systemui.statusbar.pipeline.satellite.data.RealDeviceBasedSatelliteRepository
import com.android.systemui.statusbar.pipeline.satellite.data.prod.DeviceBasedSatelliteRepositoryImpl.Companion.POLLING_INTERVAL_MS
import com.android.systemui.statusbar.pipeline.satellite.data.prod.SatelliteSupport.Companion.whenSupported
import com.android.systemui.statusbar.pipeline.satellite.data.prod.SatelliteSupport.NotSupported
import com.android.systemui.statusbar.pipeline.satellite.data.prod.SatelliteSupport.Supported
@@ -60,11 +60,9 @@ import kotlinx.coroutines.flow.MutableStateFlow
import kotlinx.coroutines.flow.SharingStarted
import kotlinx.coroutines.flow.StateFlow
import kotlinx.coroutines.flow.collectLatest
import kotlinx.coroutines.flow.distinctUntilChanged
import kotlinx.coroutines.flow.flatMapLatest
import kotlinx.coroutines.flow.flowOf
import kotlinx.coroutines.flow.flowOn
import kotlinx.coroutines.flow.map
import kotlinx.coroutines.flow.mapNotNull
import kotlinx.coroutines.flow.onStart
import kotlinx.coroutines.flow.stateIn
@@ -122,15 +120,9 @@ sealed interface SatelliteSupport {
}

/**
 * Basically your everyday run-of-the-mill system service listener, with three notable exceptions.
 * Basically your everyday run-of-the-mill system service listener, with two notable exceptions.
 *
 * First, there is an availability bit that we are tracking via [SatelliteManager]. See
 * [isSatelliteAllowedForCurrentLocation] for the implementation details. The thing to note about
 * this bit is that there is no callback that exists. Therefore we implement a simple polling
 * mechanism here. Since the underlying bit is location-dependent, we simply poll every hour (see
 * [POLLING_INTERVAL_MS]) and see what the current state is.
 *
 * Secondly, there are cases when simply requesting information from SatelliteManager can fail. See
 * First, there are cases when simply requesting information from SatelliteManager can fail. See
 * [SatelliteSupport] for details on how we track the state. What's worth noting here is that
 * SUPPORTED is a stronger guarantee than [satelliteManager] being null. Therefore, the fundamental
 * data flows here ([connectionState], [signalStrength],...) are wrapped in the convenience method
@@ -138,7 +130,7 @@ sealed interface SatelliteSupport {
 * [SupportedSatelliteManager], we can guarantee that the manager is non-null AND that it has told
 * us that satellite is supported. Therefore, we don't expect exceptions to be thrown.
 *
 * Lastly, this class is designed to wait a full minute of process uptime before making any requests
 * Second, this class is designed to wait a full minute of process uptime before making any requests
 * to the satellite manager. The hope is that by waiting we don't have to retry due to a modem that
 * is still booting up or anything like that. We can tune or remove this behavior in the future if
 * necessary.
@@ -158,8 +150,6 @@ constructor(

    private val satelliteManager: SatelliteManager?

    override val isSatelliteAllowedForCurrentLocation: MutableStateFlow<Boolean>

    // Some calls into satellite manager will throw exceptions if it is not supported.
    // This is never expected to change after boot, but may need to be retried in some cases
    @get:VisibleForTesting
@@ -221,8 +211,6 @@ constructor(
    init {
        satelliteManager = satelliteManagerOpt.getOrNull()

        isSatelliteAllowedForCurrentLocation = MutableStateFlow(false)

        if (satelliteManager != null) {
            // Outer scope launch allows us to delay until MIN_UPTIME
            scope.launch {
@@ -233,10 +221,7 @@ constructor(
                    { "Checked for system support. support=$str1" },
                )

                // Second, launch a job to poll for service availability based on location
                scope.launch { pollForAvailabilityBasedOnLocation() }

                // Third, register a listener to let us know if there are changes to support
                // Second, register a listener to let us know if there are changes to support
                scope.launch { listenForChangesToSatelliteSupport(satelliteManager) }
            }
        } else {
@@ -259,28 +244,43 @@ constructor(
        return sm.checkSatelliteSupported()
    }

    /*
     * As there is no listener available for checking satellite allowed, we must poll the service.
     * Defaulting to polling at most once every 20m while active. Subsequent OOS events will restart
     * the job, so a flaky connection might cause more frequent checks.
     */
    private suspend fun pollForAvailabilityBasedOnLocation() {
    override val isSatelliteAllowedForCurrentLocation =
        satelliteSupport
            .whenSupported(
                supported = ::isSatelliteAllowedHasListener,
                supported = ::isSatelliteAvailableFlow,
                orElse = flowOf(false),
                retrySignal = telephonyProcessCrashedEvent,
            )
            .collectLatest { hasSubscribers ->
                if (hasSubscribers) {
                    while (true) {
                        logBuffer.i { "requestIsCommunicationAllowedForCurrentLocation" }
                        checkIsSatelliteAllowed()
                        delay(POLLING_INTERVAL_MS)
            .stateIn(scope, SharingStarted.Lazily, false)

    private fun isSatelliteAvailableFlow(sm: SupportedSatelliteManager): Flow<Boolean> =
        conflatedCallbackFlow {
                val callback = SatelliteCommunicationAllowedStateCallback { allowed ->
                    logBuffer.i({ bool1 = allowed }) {
                        "onSatelliteCommunicationAllowedStateChanged: $bool1"
                    }

                    trySend(allowed)
                }

                var registered = false
                try {
                    sm.registerForCommunicationAllowedStateChanged(
                        bgDispatcher.asExecutor(),
                        callback
                    )
                    registered = true
                } catch (e: Exception) {
                    logBuffer.e("Error calling registerForCommunicationAllowedStateChanged", e)
                }

                awaitClose {
                    if (registered) {
                        sm.unregisterForCommunicationAllowedStateChanged(callback)
                    }
                }
            }
            .flowOn(bgDispatcher)

    /**
     * Register a callback with [SatelliteManager] to let us know if there is a change in satellite
@@ -410,14 +410,6 @@ constructor(
            }
        }

    /**
     * Signal that we should start polling [checkIsSatelliteAllowed]. We only need to poll if there
     * are active listeners to [isSatelliteAllowedForCurrentLocation]
     */
    @SuppressWarnings("unused")
    private fun isSatelliteAllowedHasListener(sm: SupportedSatelliteManager): Flow<Boolean> =
        isSatelliteAllowedForCurrentLocation.subscriptionCount.map { it > 0 }.distinctUntilChanged()

    override val connectionState =
        satelliteSupport
            .whenSupported(
@@ -485,28 +477,6 @@ constructor(
            }
            .flowOn(bgDispatcher)

    /** Fire off a request to check for satellite availability. Always runs on the bg context */
    private suspend fun checkIsSatelliteAllowed() =
        withContext(bgDispatcher) {
            satelliteManager?.requestIsCommunicationAllowedForCurrentLocation(
                bgDispatcher.asExecutor(),
                object : OutcomeReceiver<Boolean, SatelliteManager.SatelliteException> {
                    override fun onError(e: SatelliteManager.SatelliteException) {
                        logBuffer.e(
                            "Found exception when checking availability",
                            e,
                        )
                        isSatelliteAllowedForCurrentLocation.value = false
                    }

                    override fun onResult(allowed: Boolean) {
                        logBuffer.i { "isSatelliteAllowedForCurrentLocation: $allowed" }
                        isSatelliteAllowedForCurrentLocation.value = allowed
                    }
                }
            )
        }

    private suspend fun SatelliteManager.checkSatelliteSupported(): SatelliteSupport =
        suspendCancellableCoroutine { continuation ->
            val cb =
@@ -546,9 +516,6 @@ constructor(
        }

    companion object {
        // TTL for satellite polling is twenty minutes
        const val POLLING_INTERVAL_MS: Long = 1000 * 60 * 20

        // Let the system boot up and stabilize before we check for system support
        const val MIN_UPTIME: Long = 1000 * 60

+63 −134
Original line number Diff line number Diff line
@@ -22,6 +22,7 @@ import android.telephony.TelephonyCallback
import android.telephony.TelephonyManager
import android.telephony.satellite.NtnSignalStrength
import android.telephony.satellite.NtnSignalStrengthCallback
import android.telephony.satellite.SatelliteCommunicationAllowedStateCallback
import android.telephony.satellite.SatelliteManager
import android.telephony.satellite.SatelliteManager.SATELLITE_MODEM_STATE_CONNECTED
import android.telephony.satellite.SatelliteManager.SATELLITE_MODEM_STATE_DATAGRAM_RETRYING
@@ -44,7 +45,6 @@ import com.android.systemui.coroutines.collectLastValue
import com.android.systemui.log.core.FakeLogBuffer
import com.android.systemui.statusbar.pipeline.mobile.data.repository.prod.MobileTelephonyHelpers
import com.android.systemui.statusbar.pipeline.satellite.data.prod.DeviceBasedSatelliteRepositoryImpl.Companion.MIN_UPTIME
import com.android.systemui.statusbar.pipeline.satellite.data.prod.DeviceBasedSatelliteRepositoryImpl.Companion.POLLING_INTERVAL_MS
import com.android.systemui.statusbar.pipeline.satellite.shared.model.SatelliteConnectionState
import com.android.systemui.util.mockito.any
import com.android.systemui.util.mockito.whenever
@@ -54,11 +54,8 @@ import com.google.common.truth.Truth.assertThat
import java.util.Optional
import kotlin.test.Test
import kotlinx.coroutines.ExperimentalCoroutinesApi
import kotlinx.coroutines.flow.launchIn
import kotlinx.coroutines.flow.onEach
import kotlinx.coroutines.test.StandardTestDispatcher
import kotlinx.coroutines.test.TestScope
import kotlinx.coroutines.test.advanceTimeBy
import kotlinx.coroutines.test.runCurrent
import kotlinx.coroutines.test.runTest
import org.junit.Before
@@ -71,6 +68,7 @@ import org.mockito.Mockito.never
import org.mockito.Mockito.times
import org.mockito.Mockito.verify
import org.mockito.MockitoAnnotations
import org.mockito.kotlin.doThrow

@Suppress("EXPERIMENTAL_IS_NOT_ENABLED")
@OptIn(ExperimentalCoroutinesApi::class)
@@ -186,149 +184,83 @@ class DeviceBasedSatelliteRepositoryImplTest : SysuiTestCase() {
        }

    @Test
    fun isSatelliteAllowed_readsSatelliteManagerState_enabled() =
    fun isSatelliteAllowed_listensToSatelliteManagerCallback() =
        testScope.runTest {
            setupDefaultRepo()
            // GIVEN satellite is allowed in this location
            val allowed = true

            doAnswer {
                    val receiver = it.arguments[1] as OutcomeReceiver<Boolean, SatelliteException>
                    receiver.onResult(allowed)
                    null
                }
                .`when`(satelliteManager)
                .requestIsCommunicationAllowedForCurrentLocation(
                    any(),
                    any<OutcomeReceiver<Boolean, SatelliteException>>()
                )

            val latest by collectLastValue(underTest.isSatelliteAllowedForCurrentLocation)
            runCurrent()

            assertThat(latest).isTrue()
            val callback =
                withArgCaptor<SatelliteCommunicationAllowedStateCallback> {
                    verify(satelliteManager)
                        .registerForCommunicationAllowedStateChanged(any(), capture())
                }

    @Test
    fun isSatelliteAllowed_readsSatelliteManagerState_disabled() =
        testScope.runTest {
            setupDefaultRepo()
            // GIVEN satellite is not allowed in this location
            val allowed = false
            // WHEN satellite manager says it's not available
            callback.onSatelliteCommunicationAllowedStateChanged(false)

            doAnswer {
                    val receiver = it.arguments[1] as OutcomeReceiver<Boolean, SatelliteException>
                    receiver.onResult(allowed)
                    null
                }
                .`when`(satelliteManager)
                .requestIsCommunicationAllowedForCurrentLocation(
                    any(),
                    any<OutcomeReceiver<Boolean, SatelliteException>>()
                )
            // THEN it's not!
            assertThat(latest).isFalse()

            val latest by collectLastValue(underTest.isSatelliteAllowedForCurrentLocation)
            // WHEN satellite manager says it's changed to available
            callback.onSatelliteCommunicationAllowedStateChanged(true)

            assertThat(latest).isFalse()
            // THEN it is!
            assertThat(latest).isTrue()
        }

    @Test
    fun isSatelliteAllowed_pollsOnTimeout() =
    fun isSatelliteAllowed_falseWhenErrorOccurs() =
        testScope.runTest {
            setupDefaultRepo()
            // GIVEN satellite is not allowed in this location
            var allowed = false

            doAnswer {
                    val receiver = it.arguments[1] as OutcomeReceiver<Boolean, SatelliteException>
                    receiver.onResult(allowed)
                    null
                }
            // GIVEN SatelliteManager gon' throw exceptions when we ask to register the callback
            doThrow(RuntimeException("Test exception"))
                .`when`(satelliteManager)
                .requestIsCommunicationAllowedForCurrentLocation(
                    any(),
                    any<OutcomeReceiver<Boolean, SatelliteException>>()
                )
                .registerForCommunicationAllowedStateChanged(any(), any())

            // WHEN the latest value is requested (and thus causes an exception to be thrown)
            val latest by collectLastValue(underTest.isSatelliteAllowedForCurrentLocation)

            // THEN the value is just false, and we didn't crash!
            assertThat(latest).isFalse()

            // WHEN satellite becomes enabled
            allowed = true

            // WHEN the timeout has not yet been reached
            advanceTimeBy(POLLING_INTERVAL_MS / 2)

            // THEN the value is still false
            assertThat(latest).isFalse()

            // WHEN time advances beyond the polling interval
            advanceTimeBy(POLLING_INTERVAL_MS / 2 + 1)

            // THEN then new value is emitted
            assertThat(latest).isTrue()
        }

    @Test
    fun isSatelliteAllowed_pollingRestartsWhenCollectionRestarts() =
    fun isSatelliteAllowed_reRegistersOnTelephonyProcessCrash() =
        testScope.runTest {
            setupDefaultRepo()
            // Use the old school launch/cancel so we can simulate subscribers arriving and leaving

            var latest: Boolean? = false
            var job =
                underTest.isSatelliteAllowedForCurrentLocation.onEach { latest = it }.launchIn(this)

            // GIVEN satellite is not allowed in this location
            var allowed = false
            val latest by collectLastValue(underTest.isSatelliteAllowedForCurrentLocation)
            runCurrent()

            doAnswer {
                    val receiver = it.arguments[1] as OutcomeReceiver<Boolean, SatelliteException>
                    receiver.onResult(allowed)
                    null
            val callback =
                withArgCaptor<SatelliteCommunicationAllowedStateCallback> {
                    verify(satelliteManager)
                        .registerForCommunicationAllowedStateChanged(any(), capture())
                }
                .`when`(satelliteManager)
                .requestIsCommunicationAllowedForCurrentLocation(
                    any(),
                    any<OutcomeReceiver<Boolean, SatelliteException>>()
                )

            assertThat(latest).isFalse()

            // WHEN satellite becomes enabled
            allowed = true

            // WHEN the job is restarted
            advanceTimeBy(POLLING_INTERVAL_MS / 2)
            val telephonyCallback =
                MobileTelephonyHelpers.getTelephonyCallbackForType<
                    TelephonyCallback.RadioPowerStateListener
                >(
                    telephonyManager
                )

            job.cancel()
            job =
                underTest.isSatelliteAllowedForCurrentLocation.onEach { latest = it }.launchIn(this)
            // GIVEN satellite is currently provisioned
            callback.onSatelliteCommunicationAllowedStateChanged(true)

            // THEN the value is re-fetched
            assertThat(latest).isTrue()

            job.cancel()
        }

    @Test
    fun isSatelliteAllowed_falseWhenErrorOccurs() =
        testScope.runTest {
            setupDefaultRepo()
            doAnswer {
                    val receiver = it.arguments[1] as OutcomeReceiver<Boolean, SatelliteException>
                    receiver.onError(SatelliteException(1 /* unused */))
                    null
                }
                .`when`(satelliteManager)
                .requestIsCommunicationAllowedForCurrentLocation(
                    any(),
                    any<OutcomeReceiver<Boolean, SatelliteException>>()
                )

            val latest by collectLastValue(underTest.isSatelliteAllowedForCurrentLocation)
            // WHEN a crash event happens (detected by radio state change)
            telephonyCallback.onRadioPowerStateChanged(TelephonyManager.RADIO_POWER_ON)
            runCurrent()
            telephonyCallback.onRadioPowerStateChanged(TelephonyManager.RADIO_POWER_OFF)
            runCurrent()

            assertThat(latest).isFalse()
            // THEN listener is re-registered
            verify(satelliteManager, times(2))
                .registerForCommunicationAllowedStateChanged(any(), any())
        }

    @Test
@@ -377,10 +309,7 @@ class DeviceBasedSatelliteRepositoryImplTest : SysuiTestCase() {
                    null
                }
                .whenever(satelliteManager)
                .requestIsProvisioned(
                    any(),
                    any<OutcomeReceiver<Boolean, SatelliteException>>()
                )
                .requestIsProvisioned(any(), any<OutcomeReceiver<Boolean, SatelliteException>>())

            // GIVEN we've been up long enough to start querying
            systemClock.setUptimeMillis(Process.getStartUptimeMillis() + MIN_UPTIME)