Loading core/java/com/android/internal/protolog/PerfettoProtoLogImpl.java +24 −37 Original line number Diff line number Diff line Loading @@ -46,8 +46,6 @@ import static android.internal.perfetto.protos.TracePacketOuterClass.TracePacket import android.annotation.NonNull; import android.annotation.Nullable; import android.internal.perfetto.protos.Protolog.ProtoLogViewerConfig.MessageData; import android.os.Handler; import android.os.HandlerThread; import android.os.RemoteException; import android.os.ServiceManager; import android.os.ShellCommand; Loading Loading @@ -85,7 +83,9 @@ import java.util.Objects; import java.util.Set; import java.util.TreeMap; import java.util.UUID; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.locks.ReadWriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock; Loading Loading @@ -126,7 +126,7 @@ public abstract class PerfettoProtoLogImpl extends IProtoLogClient.Stub implemen /** * This set tracks active tracing instances from the perspective of the {@code * mBackgroundLoggingService}. It contains instance indexes, added when a tracing session starts * mSingleThreadedExecutor}. It contains instance indexes, added when a tracing session starts * and removed when it stops. This ensures that queued messages are traced only to the expected * tracing session. * Loading @@ -136,23 +136,18 @@ public abstract class PerfettoProtoLogImpl extends IProtoLogClient.Stub implemen * <li>Tracing messages logged after a session stops but still in the queue.</li> * </ul> * * <p>The set is modified on the single-threaded {@code mBackgroundLoggingService}, ensuring * <p>The set is modified on the single-threaded {@code mSingleThreadedExecutor}, ensuring * that the add/remove operations happen only after all messages in the queue at that point are * processed. */ @NonNull private final Set<Integer> mActiveTracingInstances = new ArraySet<>(); @NonNull private final HandlerThread mBackgroundThread; // Handler associated with the mBackgroundThread, ensuring tasks are processed sequentially // on a single background thread. This is crucial for operations like connecting to the // configuration service before other logging activities, and synchronizing queued // logging tasks on tracing start and stop. // A single-threaded executor to ensure that all background tasks are processed sequentially. // This is crucial for operations like connecting to the configuration service before other // logging activities, and synchronizing queued logging tasks on tracing start and stop. @VisibleForTesting @NonNull public final Handler mBackgroundHandler; public final ExecutorService mSingleThreadedExecutor = Executors.newSingleThreadExecutor(); // Set to true once this is ready to accept protolog to logcat requests. private boolean mLogcatReady = false; Loading @@ -173,10 +168,6 @@ public abstract class PerfettoProtoLogImpl extends IProtoLogClient.Stub implemen mCacheUpdater = cacheUpdater; mConfigurationService = configurationService; mBackgroundThread = new HandlerThread("ProtoLogBackground"); mBackgroundThread.start(); mBackgroundHandler = Handler.createAsync(mBackgroundThread.getLooper()); registerGroupsLocally(groups); } Loading Loading @@ -241,7 +232,7 @@ public abstract class PerfettoProtoLogImpl extends IProtoLogClient.Stub implemen Objects.requireNonNull(mConfigurationService, "A null ProtoLog Configuration Service was provided!"); mBackgroundHandler.post(() -> { mSingleThreadedExecutor.execute(() -> { try { var args = createConfigurationServiceRegisterClientArgs(); args.groups = new String[groups.length]; Loading @@ -266,7 +257,7 @@ public abstract class PerfettoProtoLogImpl extends IProtoLogClient.Stub implemen Objects.requireNonNull(mConfigurationService, "A null ProtoLog Configuration Service was provided!"); mBackgroundHandler.post(() -> { mSingleThreadedExecutor.execute(() -> { try { var args = new IProtoLogConfigurationService.RegisterGroupsArgs(); args.groups = new String[groups.length]; Loading Loading @@ -297,14 +288,14 @@ public abstract class PerfettoProtoLogImpl extends IProtoLogClient.Stub implemen disconnectFromConfigurationServiceAsync(); } mBackgroundThread.quitSafely(); mSingleThreadedExecutor.shutdown(); } private void disconnectFromConfigurationServiceAsync() { Objects.requireNonNull(mConfigurationService, "A null ProtoLog Configuration Service was provided!"); mBackgroundHandler.post(() -> { mSingleThreadedExecutor.execute(() -> { try { mConfigurationService.unregisterClient(this); } catch (RemoteException e) { Loading Loading @@ -515,7 +506,7 @@ public abstract class PerfettoProtoLogImpl extends IProtoLogClient.Stub implemen snapshotMutableArgsToStringInPlace(args); } mBackgroundHandler.post(() -> { mSingleThreadedExecutor.execute(() -> { try { logToProto(logLevel, group, message, args, tsNanos, stacktrace); } catch (RuntimeException e) { Loading Loading @@ -912,10 +903,10 @@ public abstract class PerfettoProtoLogImpl extends IProtoLogClient.Stub implemen } // It is crucial to add the instanceIdx to mActiveTracingInstances via the // mBackgroundLoggingService. This ensures that this operation is enqueued // mSingleThreadedExecutor. This ensures that this operation is enqueued // and executed *after* any log messages that were submitted *before* this // tracing instance started. The check for mActiveTracingInstances.contains(instanceIdx) // happens within the logToProto method (which also runs on mBackgroundLoggingService). // happens within the logToProto method (which also runs on mSingleThreadedExecutor). // If we added instanceIdx directly, log messages already in the queue could be // incorrectly attributed to this new tracing session. queueTracingInstanceAddition(instanceIdx); Loading @@ -924,7 +915,7 @@ public abstract class PerfettoProtoLogImpl extends IProtoLogClient.Stub implemen } private void queueTracingInstanceAddition(int instanceIdx) { mBackgroundHandler.post(() -> mActiveTracingInstances.add(instanceIdx)); mSingleThreadedExecutor.execute(() -> mActiveTracingInstances.add(instanceIdx)); } private void onTracingInstanceStartLocked(@NonNull ProtoLogDataSource.ProtoLogConfig config) { Loading Loading @@ -961,7 +952,7 @@ public abstract class PerfettoProtoLogImpl extends IProtoLogClient.Stub implemen Log.d(LOG_TAG, "Executing onTracingInstanceStop"); // Similar to onTracingInstanceStart, it's crucial to remove the instanceIdx // via the mBackgroundLoggingService. This ensures that the removal happens // via the mSingleThreadedExecutor. This ensures that the removal happens // *after* all log messages enqueued *before* this tracing instance was stopped // have been processed and had a chance to be included in this trace. // If we removed instanceIdx directly, log messages still in the queue that Loading @@ -980,7 +971,7 @@ public abstract class PerfettoProtoLogImpl extends IProtoLogClient.Stub implemen } private void queueTracingInstanceRemoval(int instanceIdx) { mBackgroundHandler.post(() -> mActiveTracingInstances.remove(instanceIdx)); mSingleThreadedExecutor.execute(() -> mActiveTracingInstances.remove(instanceIdx)); } private void onTracingInstanceStopLocked(@NonNull ProtoLogDataSource.ProtoLogConfig config) { Loading Loading @@ -1074,16 +1065,12 @@ public abstract class PerfettoProtoLogImpl extends IProtoLogClient.Stub implemen } private void waitForExistingBackgroundTasksToComplete() { final CountDownLatch latch = new CountDownLatch(1); mBackgroundHandler.post(() -> { Log.i(LOG_TAG, "Completed all pending background tasks"); latch.countDown(); }); try { latch.await(); } catch (InterruptedException e) { Log.e(LOG_TAG, "Failed to wait for tracing service background tasks to complete", e); Thread.currentThread().interrupt(); // Preserve interrupt status this.mSingleThreadedExecutor.submit(() -> { Log.i(LOG_TAG, "Completed all pending background tasks"); }).get(); } catch (InterruptedException | ExecutionException e) { Log.wtf(LOG_TAG, "Failed to wait for tracing service background tasks to complete", e); } } Loading core/java/com/android/internal/protolog/ProcessedPerfettoProtoLogImpl.java +1 −1 Original line number Diff line number Diff line Loading @@ -208,7 +208,7 @@ public class ProcessedPerfettoProtoLogImpl extends PerfettoProtoLogImpl { // Load in background to avoid delay in boot process. // The caveat is that any log message that is also logged to logcat will not be // successfully decoded until this completes. mBackgroundHandler.post(() -> { mSingleThreadedExecutor.execute(() -> { mViewerConfigReader.loadViewerConfig(groupsLoggingToLogcat.toArray(new String[0])); readyToLogToLogcat(); }); Loading tests/Tracing/src/com/android/internal/protolog/ProcessedPerfettoProtoLogImplTest.java +6 −6 Original line number Diff line number Diff line Loading @@ -33,7 +33,6 @@ import static org.mockito.Mockito.when; import static java.io.File.createTempFile; import android.os.Handler; import android.os.SystemClock; import android.platform.test.annotations.Presubmit; import android.tools.ScenarioBuilder; Loading Loading @@ -72,6 +71,7 @@ import java.io.IOException; import java.util.List; import java.util.Random; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; Loading Loading @@ -1002,7 +1002,7 @@ public class ProcessedPerfettoProtoLogImplTest { final CountDownLatch releaseExecutorLatch = new CountDownLatch(1); // Submit task to block the executor. sProtoLog.mBackgroundHandler.post(() -> { sProtoLog.mSingleThreadedExecutor.execute(() -> { executorBlockedLatch.countDown(); try { if (!releaseExecutorLatch.await(60, TimeUnit.SECONDS)) { Loading Loading @@ -1099,7 +1099,7 @@ public class ProcessedPerfettoProtoLogImplTest { sProtoLog.isProtoEnabled()); // Submit a task that will block the executor queue. sProtoLog.mBackgroundHandler.post(() -> { sProtoLog.mSingleThreadedExecutor.execute(() -> { try { blockingTaskStartedExecution.set(true); processingHasStartedLatch.countDown(); // Signal that this task has started Loading Loading @@ -1141,7 +1141,7 @@ public class ProcessedPerfettoProtoLogImplTest { allowProcessingToContinueLatch.countDown(); // Stop tracing immediately. The implementation should wait for the // mBackgroundHandler to process all queued messages (including the // mSingleThreadedExecutor to process all queued messages (including the // now-unblocked first task and all subsequent log messages). } finally { // Ensure the latch is always counted down if an exception occurred before stop, Loading Loading @@ -1179,11 +1179,11 @@ public class ProcessedPerfettoProtoLogImplTest { final StringBuilder mutableArg = new StringBuilder(initialValue); final String logMessageFormat = "Test with mutable arg: %s"; final Handler backgroundHandler = sProtoLog.mBackgroundHandler; final ExecutorService backgroundHandler = sProtoLog.mSingleThreadedExecutor; // Task to pause the background thread. final CountDownLatch backgroundThreadPausedLatch = new CountDownLatch(1); backgroundHandler.post(() -> { backgroundHandler.execute(() -> { try { backgroundThreadPausedLatch.await(); } catch (InterruptedException e) { Loading Loading
core/java/com/android/internal/protolog/PerfettoProtoLogImpl.java +24 −37 Original line number Diff line number Diff line Loading @@ -46,8 +46,6 @@ import static android.internal.perfetto.protos.TracePacketOuterClass.TracePacket import android.annotation.NonNull; import android.annotation.Nullable; import android.internal.perfetto.protos.Protolog.ProtoLogViewerConfig.MessageData; import android.os.Handler; import android.os.HandlerThread; import android.os.RemoteException; import android.os.ServiceManager; import android.os.ShellCommand; Loading Loading @@ -85,7 +83,9 @@ import java.util.Objects; import java.util.Set; import java.util.TreeMap; import java.util.UUID; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.locks.ReadWriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock; Loading Loading @@ -126,7 +126,7 @@ public abstract class PerfettoProtoLogImpl extends IProtoLogClient.Stub implemen /** * This set tracks active tracing instances from the perspective of the {@code * mBackgroundLoggingService}. It contains instance indexes, added when a tracing session starts * mSingleThreadedExecutor}. It contains instance indexes, added when a tracing session starts * and removed when it stops. This ensures that queued messages are traced only to the expected * tracing session. * Loading @@ -136,23 +136,18 @@ public abstract class PerfettoProtoLogImpl extends IProtoLogClient.Stub implemen * <li>Tracing messages logged after a session stops but still in the queue.</li> * </ul> * * <p>The set is modified on the single-threaded {@code mBackgroundLoggingService}, ensuring * <p>The set is modified on the single-threaded {@code mSingleThreadedExecutor}, ensuring * that the add/remove operations happen only after all messages in the queue at that point are * processed. */ @NonNull private final Set<Integer> mActiveTracingInstances = new ArraySet<>(); @NonNull private final HandlerThread mBackgroundThread; // Handler associated with the mBackgroundThread, ensuring tasks are processed sequentially // on a single background thread. This is crucial for operations like connecting to the // configuration service before other logging activities, and synchronizing queued // logging tasks on tracing start and stop. // A single-threaded executor to ensure that all background tasks are processed sequentially. // This is crucial for operations like connecting to the configuration service before other // logging activities, and synchronizing queued logging tasks on tracing start and stop. @VisibleForTesting @NonNull public final Handler mBackgroundHandler; public final ExecutorService mSingleThreadedExecutor = Executors.newSingleThreadExecutor(); // Set to true once this is ready to accept protolog to logcat requests. private boolean mLogcatReady = false; Loading @@ -173,10 +168,6 @@ public abstract class PerfettoProtoLogImpl extends IProtoLogClient.Stub implemen mCacheUpdater = cacheUpdater; mConfigurationService = configurationService; mBackgroundThread = new HandlerThread("ProtoLogBackground"); mBackgroundThread.start(); mBackgroundHandler = Handler.createAsync(mBackgroundThread.getLooper()); registerGroupsLocally(groups); } Loading Loading @@ -241,7 +232,7 @@ public abstract class PerfettoProtoLogImpl extends IProtoLogClient.Stub implemen Objects.requireNonNull(mConfigurationService, "A null ProtoLog Configuration Service was provided!"); mBackgroundHandler.post(() -> { mSingleThreadedExecutor.execute(() -> { try { var args = createConfigurationServiceRegisterClientArgs(); args.groups = new String[groups.length]; Loading @@ -266,7 +257,7 @@ public abstract class PerfettoProtoLogImpl extends IProtoLogClient.Stub implemen Objects.requireNonNull(mConfigurationService, "A null ProtoLog Configuration Service was provided!"); mBackgroundHandler.post(() -> { mSingleThreadedExecutor.execute(() -> { try { var args = new IProtoLogConfigurationService.RegisterGroupsArgs(); args.groups = new String[groups.length]; Loading Loading @@ -297,14 +288,14 @@ public abstract class PerfettoProtoLogImpl extends IProtoLogClient.Stub implemen disconnectFromConfigurationServiceAsync(); } mBackgroundThread.quitSafely(); mSingleThreadedExecutor.shutdown(); } private void disconnectFromConfigurationServiceAsync() { Objects.requireNonNull(mConfigurationService, "A null ProtoLog Configuration Service was provided!"); mBackgroundHandler.post(() -> { mSingleThreadedExecutor.execute(() -> { try { mConfigurationService.unregisterClient(this); } catch (RemoteException e) { Loading Loading @@ -515,7 +506,7 @@ public abstract class PerfettoProtoLogImpl extends IProtoLogClient.Stub implemen snapshotMutableArgsToStringInPlace(args); } mBackgroundHandler.post(() -> { mSingleThreadedExecutor.execute(() -> { try { logToProto(logLevel, group, message, args, tsNanos, stacktrace); } catch (RuntimeException e) { Loading Loading @@ -912,10 +903,10 @@ public abstract class PerfettoProtoLogImpl extends IProtoLogClient.Stub implemen } // It is crucial to add the instanceIdx to mActiveTracingInstances via the // mBackgroundLoggingService. This ensures that this operation is enqueued // mSingleThreadedExecutor. This ensures that this operation is enqueued // and executed *after* any log messages that were submitted *before* this // tracing instance started. The check for mActiveTracingInstances.contains(instanceIdx) // happens within the logToProto method (which also runs on mBackgroundLoggingService). // happens within the logToProto method (which also runs on mSingleThreadedExecutor). // If we added instanceIdx directly, log messages already in the queue could be // incorrectly attributed to this new tracing session. queueTracingInstanceAddition(instanceIdx); Loading @@ -924,7 +915,7 @@ public abstract class PerfettoProtoLogImpl extends IProtoLogClient.Stub implemen } private void queueTracingInstanceAddition(int instanceIdx) { mBackgroundHandler.post(() -> mActiveTracingInstances.add(instanceIdx)); mSingleThreadedExecutor.execute(() -> mActiveTracingInstances.add(instanceIdx)); } private void onTracingInstanceStartLocked(@NonNull ProtoLogDataSource.ProtoLogConfig config) { Loading Loading @@ -961,7 +952,7 @@ public abstract class PerfettoProtoLogImpl extends IProtoLogClient.Stub implemen Log.d(LOG_TAG, "Executing onTracingInstanceStop"); // Similar to onTracingInstanceStart, it's crucial to remove the instanceIdx // via the mBackgroundLoggingService. This ensures that the removal happens // via the mSingleThreadedExecutor. This ensures that the removal happens // *after* all log messages enqueued *before* this tracing instance was stopped // have been processed and had a chance to be included in this trace. // If we removed instanceIdx directly, log messages still in the queue that Loading @@ -980,7 +971,7 @@ public abstract class PerfettoProtoLogImpl extends IProtoLogClient.Stub implemen } private void queueTracingInstanceRemoval(int instanceIdx) { mBackgroundHandler.post(() -> mActiveTracingInstances.remove(instanceIdx)); mSingleThreadedExecutor.execute(() -> mActiveTracingInstances.remove(instanceIdx)); } private void onTracingInstanceStopLocked(@NonNull ProtoLogDataSource.ProtoLogConfig config) { Loading Loading @@ -1074,16 +1065,12 @@ public abstract class PerfettoProtoLogImpl extends IProtoLogClient.Stub implemen } private void waitForExistingBackgroundTasksToComplete() { final CountDownLatch latch = new CountDownLatch(1); mBackgroundHandler.post(() -> { Log.i(LOG_TAG, "Completed all pending background tasks"); latch.countDown(); }); try { latch.await(); } catch (InterruptedException e) { Log.e(LOG_TAG, "Failed to wait for tracing service background tasks to complete", e); Thread.currentThread().interrupt(); // Preserve interrupt status this.mSingleThreadedExecutor.submit(() -> { Log.i(LOG_TAG, "Completed all pending background tasks"); }).get(); } catch (InterruptedException | ExecutionException e) { Log.wtf(LOG_TAG, "Failed to wait for tracing service background tasks to complete", e); } } Loading
core/java/com/android/internal/protolog/ProcessedPerfettoProtoLogImpl.java +1 −1 Original line number Diff line number Diff line Loading @@ -208,7 +208,7 @@ public class ProcessedPerfettoProtoLogImpl extends PerfettoProtoLogImpl { // Load in background to avoid delay in boot process. // The caveat is that any log message that is also logged to logcat will not be // successfully decoded until this completes. mBackgroundHandler.post(() -> { mSingleThreadedExecutor.execute(() -> { mViewerConfigReader.loadViewerConfig(groupsLoggingToLogcat.toArray(new String[0])); readyToLogToLogcat(); }); Loading
tests/Tracing/src/com/android/internal/protolog/ProcessedPerfettoProtoLogImplTest.java +6 −6 Original line number Diff line number Diff line Loading @@ -33,7 +33,6 @@ import static org.mockito.Mockito.when; import static java.io.File.createTempFile; import android.os.Handler; import android.os.SystemClock; import android.platform.test.annotations.Presubmit; import android.tools.ScenarioBuilder; Loading Loading @@ -72,6 +71,7 @@ import java.io.IOException; import java.util.List; import java.util.Random; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; Loading Loading @@ -1002,7 +1002,7 @@ public class ProcessedPerfettoProtoLogImplTest { final CountDownLatch releaseExecutorLatch = new CountDownLatch(1); // Submit task to block the executor. sProtoLog.mBackgroundHandler.post(() -> { sProtoLog.mSingleThreadedExecutor.execute(() -> { executorBlockedLatch.countDown(); try { if (!releaseExecutorLatch.await(60, TimeUnit.SECONDS)) { Loading Loading @@ -1099,7 +1099,7 @@ public class ProcessedPerfettoProtoLogImplTest { sProtoLog.isProtoEnabled()); // Submit a task that will block the executor queue. sProtoLog.mBackgroundHandler.post(() -> { sProtoLog.mSingleThreadedExecutor.execute(() -> { try { blockingTaskStartedExecution.set(true); processingHasStartedLatch.countDown(); // Signal that this task has started Loading Loading @@ -1141,7 +1141,7 @@ public class ProcessedPerfettoProtoLogImplTest { allowProcessingToContinueLatch.countDown(); // Stop tracing immediately. The implementation should wait for the // mBackgroundHandler to process all queued messages (including the // mSingleThreadedExecutor to process all queued messages (including the // now-unblocked first task and all subsequent log messages). } finally { // Ensure the latch is always counted down if an exception occurred before stop, Loading Loading @@ -1179,11 +1179,11 @@ public class ProcessedPerfettoProtoLogImplTest { final StringBuilder mutableArg = new StringBuilder(initialValue); final String logMessageFormat = "Test with mutable arg: %s"; final Handler backgroundHandler = sProtoLog.mBackgroundHandler; final ExecutorService backgroundHandler = sProtoLog.mSingleThreadedExecutor; // Task to pause the background thread. final CountDownLatch backgroundThreadPausedLatch = new CountDownLatch(1); backgroundHandler.post(() -> { backgroundHandler.execute(() -> { try { backgroundThreadPausedLatch.await(); } catch (InterruptedException e) { Loading