Donate to e Foundation | Murena handsets with /e/OS | Own a part of Murena! Learn more

Commit 50a66a22 authored by Treehugger Robot's avatar Treehugger Robot Committed by Android (Google) Code Review
Browse files

Merge "Add utilities to trace callbackFlow" into main

parents b3d1e84c 92af0aa1
Loading
Loading
Loading
Loading
+77 −2
Original line number Diff line number Diff line
@@ -13,14 +13,27 @@
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
@file:OptIn(ExperimentalTypeInference::class)

package com.android.app.tracing

import com.android.app.tracing.TraceUtils.traceAsync
import java.util.concurrent.atomic.AtomicInteger
import kotlin.experimental.ExperimentalTypeInference
import kotlinx.coroutines.channels.ProducerScope
import kotlinx.coroutines.channels.awaitClose
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.callbackFlow
import kotlinx.coroutines.flow.conflate
import kotlinx.coroutines.flow.onEach

/** Utilities to trace Flows */
object FlowTracing {

    private const val TAG = "FlowTracing"
    private const val DEFAULT_ASYNC_TRACK_NAME = TAG
    private val counter = AtomicInteger(0)

    /** Logs each flow element to a trace. */
    inline fun <T> Flow<T>.traceEach(
        flowName: String,
@@ -33,12 +46,74 @@ object FlowTracing {
        return baseFlow.onEach { stateLogger.log(valueToString(it)) }
    }

    fun <T> Flow<T>.traceEmissionCount(flowName: String): Flow<T> {
        val trackName = "$flowName#emissionCount"
    /** Adds a counter track to monitor emissions from a specific flow.] */
    fun <T> Flow<T>.traceEmissionCount(flowName: String, uniqueSuffix: Boolean = false): Flow<T> {
        val trackName by lazy {
            "$flowName#emissionCount" + if (uniqueSuffix) "\$${counter.addAndGet(1)}" else ""
        }
        var count = 0
        return onEach {
            count += 1
            traceCounter(trackName, count)
        }
    }

    /**
     * Adds a counter track to monitor emissions from a specific flow.
     *
     * [flowName] is lazy: it would be computed only if tracing is enabled and only the first time.
     */
    fun <T> Flow<T>.traceEmissionCount(
        flowName: () -> String,
        uniqueSuffix: Boolean = false
    ): Flow<T> {
        val trackName by lazy {
            "${flowName()}#emissionCount" + if (uniqueSuffix) "\$${counter.addAndGet(1)}" else ""
        }
        var count = 0
        return onEach {
            count += 1
            if (isEnabled()) {
                traceCounter(trackName, count)
            }
        }
    }

    /**
     * Makes [awaitClose] output Perfetto traces.
     *
     * There will be 2 traces:
     * - One in the thread this is being executed on
     * - One in a track having [DEFAULT_ASYNC_TRACK_NAME] name.
     *
     * This allows to easily have visibility into what's happening in awaitClose.
     */
    suspend fun ProducerScope<*>.tracedAwaitClose(name: String, block: () -> Unit = {}) {
        awaitClose {
            val traceName = { "$name#TracedAwaitClose" }
            traceAsync(DEFAULT_ASYNC_TRACK_NAME, traceName) { traceSection(traceName) { block() } }
        }
    }

    /**
     * Traced version of [callbackFlow].
     *
     * Adds tracing in 2 ways:
     * - An async slice will appear in the [DEFAULT_ASYNC_TRACK_NAME] named track.
     * - A counter will be increased at every emission
     *
     * Should be used with [tracedAwaitClose] (when needed).
     */
    fun <T> tracedConflatedCallbackFlow(
        name: String,
        @BuilderInference block: suspend ProducerScope<T>.() -> Unit
    ): Flow<T> {
        return callbackFlow {
                traceAsync(DEFAULT_ASYNC_TRACK_NAME, { "$name#CallbackFlowBlock" }) {
                    block(this@callbackFlow)
                }
            }
            .conflate()
            .traceEmissionCount(name, uniqueSuffix = true)
    }
}
+26 −0
Original line number Diff line number Diff line
@@ -146,6 +146,32 @@ object TraceUtils {
    inline fun <T> traceAsync(method: String, block: () -> T): T =
        traceAsync(DEFAULT_TRACK_NAME, method, block)

    /** Creates an async slice in the default track. */
    @JvmStatic
    inline fun <T> traceAsync(tag: () -> String, block: () -> T): T {
        val tracingEnabled = isEnabled()
        return if (tracingEnabled) {
            traceAsync(DEFAULT_TRACK_NAME, tag(), block)
        } else {
            block()
        }
    }

    /**
     * Creates an async slice in the default track.
     *
     * The [tag] is computed only if tracing is enabled. See [traceAsync].
     */
    @JvmStatic
    inline fun <T> traceAsync(trackName: String, tag: () -> String, block: () -> T): T {
        val tracingEnabled = isEnabled()
        return if (tracingEnabled) {
            traceAsync(trackName, tag(), block)
        } else {
            block()
        }
    }

    /**
     * Creates an async slice in a track with [trackName] while [block] runs.
     *