Donate to e Foundation | Murena handsets with /e/OS | Own a part of Murena! Learn more

Commit f0dbd596 authored by Nicolo' Mazzucato's avatar Nicolo' Mazzucato Committed by Automerger Merge Worker
Browse files

Add utilities to trace callbackFlow am: 26905304

parents de927529 26905304
Loading
Loading
Loading
Loading
+77 −2
Original line number Diff line number Diff line
@@ -13,14 +13,27 @@
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
@file:OptIn(ExperimentalTypeInference::class)

package com.android.app.tracing

import com.android.app.tracing.TraceUtils.traceAsync
import java.util.concurrent.atomic.AtomicInteger
import kotlin.experimental.ExperimentalTypeInference
import kotlinx.coroutines.channels.ProducerScope
import kotlinx.coroutines.channels.awaitClose
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.callbackFlow
import kotlinx.coroutines.flow.conflate
import kotlinx.coroutines.flow.onEach

/** Utilities to trace Flows */
object FlowTracing {

    private const val TAG = "FlowTracing"
    private const val DEFAULT_ASYNC_TRACK_NAME = TAG
    private val counter = AtomicInteger(0)

    /** Logs each flow element to a trace. */
    inline fun <T> Flow<T>.traceEach(
        flowName: String,
@@ -33,12 +46,74 @@ object FlowTracing {
        return baseFlow.onEach { stateLogger.log(valueToString(it)) }
    }

    fun <T> Flow<T>.traceEmissionCount(flowName: String): Flow<T> {
        val trackName = "$flowName#emissionCount"
    /** Adds a counter track to monitor emissions from a specific flow.] */
    fun <T> Flow<T>.traceEmissionCount(flowName: String, uniqueSuffix: Boolean = false): Flow<T> {
        val trackName by lazy {
            "$flowName#emissionCount" + if (uniqueSuffix) "\$${counter.addAndGet(1)}" else ""
        }
        var count = 0
        return onEach {
            count += 1
            traceCounter(trackName, count)
        }
    }

    /**
     * Adds a counter track to monitor emissions from a specific flow.
     *
     * [flowName] is lazy: it would be computed only if tracing is enabled and only the first time.
     */
    fun <T> Flow<T>.traceEmissionCount(
        flowName: () -> String,
        uniqueSuffix: Boolean = false
    ): Flow<T> {
        val trackName by lazy {
            "${flowName()}#emissionCount" + if (uniqueSuffix) "\$${counter.addAndGet(1)}" else ""
        }
        var count = 0
        return onEach {
            count += 1
            if (isEnabled()) {
                traceCounter(trackName, count)
            }
        }
    }

    /**
     * Makes [awaitClose] output Perfetto traces.
     *
     * There will be 2 traces:
     * - One in the thread this is being executed on
     * - One in a track having [DEFAULT_ASYNC_TRACK_NAME] name.
     *
     * This allows to easily have visibility into what's happening in awaitClose.
     */
    suspend fun ProducerScope<*>.tracedAwaitClose(name: String, block: () -> Unit = {}) {
        awaitClose {
            val traceName = { "$name#TracedAwaitClose" }
            traceAsync(DEFAULT_ASYNC_TRACK_NAME, traceName) { traceSection(traceName) { block() } }
        }
    }

    /**
     * Traced version of [callbackFlow].
     *
     * Adds tracing in 2 ways:
     * - An async slice will appear in the [DEFAULT_ASYNC_TRACK_NAME] named track.
     * - A counter will be increased at every emission
     *
     * Should be used with [tracedAwaitClose] (when needed).
     */
    fun <T> tracedConflatedCallbackFlow(
        name: String,
        @BuilderInference block: suspend ProducerScope<T>.() -> Unit
    ): Flow<T> {
        return callbackFlow {
                traceAsync(DEFAULT_ASYNC_TRACK_NAME, { "$name#CallbackFlowBlock" }) {
                    block(this@callbackFlow)
                }
            }
            .conflate()
            .traceEmissionCount(name, uniqueSuffix = true)
    }
}
+26 −0
Original line number Diff line number Diff line
@@ -146,6 +146,32 @@ object TraceUtils {
    inline fun <T> traceAsync(method: String, block: () -> T): T =
        traceAsync(DEFAULT_TRACK_NAME, method, block)

    /** Creates an async slice in the default track. */
    @JvmStatic
    inline fun <T> traceAsync(tag: () -> String, block: () -> T): T {
        val tracingEnabled = isEnabled()
        return if (tracingEnabled) {
            traceAsync(DEFAULT_TRACK_NAME, tag(), block)
        } else {
            block()
        }
    }

    /**
     * Creates an async slice in the default track.
     *
     * The [tag] is computed only if tracing is enabled. See [traceAsync].
     */
    @JvmStatic
    inline fun <T> traceAsync(trackName: String, tag: () -> String, block: () -> T): T {
        val tracingEnabled = isEnabled()
        return if (tracingEnabled) {
            traceAsync(trackName, tag(), block)
        } else {
            block()
        }
    }

    /**
     * Creates an async slice in a track with [trackName] while [block] runs.
     *