Loading core/java/com/android/internal/protolog/PerfettoProtoLogImpl.java +30 −30 Original line number Diff line number Diff line Loading @@ -57,7 +57,6 @@ import android.tracing.perfetto.TracingContext; import android.util.ArrayMap; import android.util.ArraySet; import android.util.Log; import android.util.LongArray; import android.util.Slog; import android.util.proto.ProtoOutputStream; Loading @@ -82,7 +81,8 @@ import java.util.TreeMap; import java.util.UUID; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.locks.Lock; Loading Loading @@ -118,7 +118,21 @@ public abstract class PerfettoProtoLogImpl extends IProtoLogClient.Stub implemen private final ReadWriteLock mConfigUpdaterLock = new ReentrantReadWriteLock(); private final Lock mBackgroundServiceLock = new ReentrantLock(); protected ExecutorService mBackgroundLoggingService = Executors.newSingleThreadExecutor(); // NOTE: This is a single-thread executor configured with a ThreadPoolExecutor and a // LinkedBlockingQueue. This ensures that tasks are executed in FIFO (First-In, First-Out) // order, which is crucial for operations like connecting to the configuration service before // other logging activities, and synchronizing queued logging tasks on tracing start and stop. // Configuration: // corePoolSize: 1 (single thread) // maximumPoolSize: 1 (single thread) // keepAliveTime: 0L (threads do not time out) // workQueue: LinkedBlockingQueue (unbounded, FIFO) @VisibleForTesting public final ExecutorService mBackgroundLoggingService = new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>()); // Set to true once this is ready to accept protolog to logcat requests. private boolean mLogcatReady = false; Loading Loading @@ -399,26 +413,7 @@ public abstract class PerfettoProtoLogImpl extends IProtoLogClient.Stub implemen private void onTracingFlush() { Log.d(LOG_TAG, "Executing onTracingFlush"); final ExecutorService loggingService; mBackgroundServiceLock.lock(); try { loggingService = mBackgroundLoggingService; mBackgroundLoggingService = Executors.newSingleThreadExecutor(); } finally { mBackgroundServiceLock.unlock(); } try { loggingService.shutdown(); boolean finished = loggingService.awaitTermination(10, TimeUnit.SECONDS); if (!finished) { Log.e(LOG_TAG, "ProtoLog background tracing service didn't finish gracefully."); } } catch (InterruptedException e) { Log.e(LOG_TAG, "Failed to wait for tracing to finish", e); } waitForExistingBackgroundTasksToComplete(); if (!android.tracing.Flags.clientSideProtoLogging()) { dumpViewerConfig(); Loading Loading @@ -897,6 +892,16 @@ public abstract class PerfettoProtoLogImpl extends IProtoLogClient.Stub implemen } } private void waitForExistingBackgroundTasksToComplete() { try { this.mBackgroundLoggingService.submit(() -> { Log.i(LOG_TAG, "Completed all pending background tasks"); }).get(); } catch (InterruptedException | ExecutionException e) { Log.e(LOG_TAG, "Failed to wait for tracing service background tasks to complete", e); } } /** * This is only used by unit tests to wait until {@link #connectToConfigurationService} is * done. Because unit tests are sensitive to concurrent accesses. Loading @@ -907,13 +912,8 @@ public abstract class PerfettoProtoLogImpl extends IProtoLogClient.Stub implemen if (!(currentInstance instanceof PerfettoProtoLogImpl protoLog)) { return; } try { protoLog.mBackgroundLoggingService.submit(() -> { Log.i(LOG_TAG, "Complete initialization"); }).get(); } catch (InterruptedException | ExecutionException e) { Log.e(LOG_TAG, "Failed to wait for tracing service", e); } protoLog.waitForExistingBackgroundTasksToComplete(); } } tests/Tracing/src/com/android/internal/protolog/ProcessedPerfettoProtoLogImplTest.java +87 −0 Original line number Diff line number Diff line Loading @@ -68,6 +68,9 @@ import java.io.File; import java.io.IOException; import java.util.List; import java.util.Random; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; /** Loading Loading @@ -953,6 +956,90 @@ public class ProcessedPerfettoProtoLogImplTest { Truth.assertThat(TestProtoLogGroup.TEST_GROUP.isLogToLogcat()).isFalse(); } @Test public void processesAllPendingMessagesBeforeTraceStop() throws IOException, InterruptedException { // large number of messages to log to stress the queue final int numMessages = 1000; final CountDownLatch processingHasStartedLatch = new CountDownLatch(1); final CountDownLatch allowProcessingToContinueLatch = new CountDownLatch(1); final AtomicBoolean blockingTaskStartedExecution = new AtomicBoolean(false); // Configure trace monitor to enable all log levels for the test data source. PerfettoTraceMonitor traceMonitor = PerfettoTraceMonitor.newBuilder() .enableProtoLog(true, List.of(), TEST_PROTOLOG_DATASOURCE_NAME) .build(); try { traceMonitor.start(); assertTrue("ProtoLog should be enabled after starting the trace.", sProtoLog.isProtoEnabled()); // Submit a task that will block the executor queue. sProtoLog.mBackgroundLoggingService.execute(() -> { try { blockingTaskStartedExecution.set(true); processingHasStartedLatch.countDown(); // Signal that this task has started // Wait until the main test thread signals to continue if (!allowProcessingToContinueLatch.await(60, TimeUnit.SECONDS)) { // Fail fast if timeout occurs, to avoid test hanging indefinitely Truth.assertWithMessage( "Timeout waiting for allowProcessingToContinueLatch") .fail(); } } catch (InterruptedException e) { Thread.currentThread().interrupt(); throw new RuntimeException("Blocking task was interrupted: " + e.getMessage()); } }); // Wait for the blocking task to actually start executing on the background thread. // This ensures it's at the head of the executor's queue before we add more tasks. assertTrue("Blocking task did not start execution in time.", processingHasStartedLatch.await(5, TimeUnit.SECONDS)); // Now, submit all the log messages. They will be queued behind the blocking task. for (int i = 0; i < numMessages; i++) { sProtoLog.log(LogLevel.DEBUG, TestProtoLogGroup.TEST_GROUP, 1, LogDataType.BOOLEAN, new Object[]{true}); } // Assert that the blocking task is still active (i.e., waiting on the latch), // which implies the subsequently submitted log messages are still queued. assertTrue("Blocking task should have started execution.", blockingTaskStartedExecution.get()); Truth.assertWithMessage( "allowProcessingToContinueLatch should not have been counted down yet.") .that(allowProcessingToContinueLatch.getCount()) .isEqualTo(1L); // Allow the blocking task to complete. This will allow the executor // to start processing the queued log messages. allowProcessingToContinueLatch.countDown(); // Stop tracing immediately. The implementation should wait for the // mBackgroundLoggingService to process all queued messages (including the // now-unblocked first task and all subsequent log messages). } finally { // Ensure the latch is always counted down if an exception occurred before stop, // or if the test is ending, to prevent the background thread from hanging. if (allowProcessingToContinueLatch.getCount() > 0) { allowProcessingToContinueLatch.countDown(); } traceMonitor.stop(mWriter); } // Verify that all messages were written to the trace. final ResultReader reader = new ResultReader(mWriter.write(), mTraceConfig); final ProtoLogTrace protolog = reader.readProtoLogTrace(); Truth.assertThat(protolog.messages).hasSize(numMessages); for (int i = 0; i < numMessages; i++) { Truth.assertThat(protolog.messages.get(i).getLevel()).isEqualTo(LogLevel.DEBUG); Truth.assertThat(protolog.messages.get(i).getMessage()) .isEqualTo("My Test Debug Log Message true"); } } private enum TestProtoLogGroup implements IProtoLogGroup { TEST_GROUP(true, true, false, "TEST_TAG"); Loading Loading
core/java/com/android/internal/protolog/PerfettoProtoLogImpl.java +30 −30 Original line number Diff line number Diff line Loading @@ -57,7 +57,6 @@ import android.tracing.perfetto.TracingContext; import android.util.ArrayMap; import android.util.ArraySet; import android.util.Log; import android.util.LongArray; import android.util.Slog; import android.util.proto.ProtoOutputStream; Loading @@ -82,7 +81,8 @@ import java.util.TreeMap; import java.util.UUID; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.locks.Lock; Loading Loading @@ -118,7 +118,21 @@ public abstract class PerfettoProtoLogImpl extends IProtoLogClient.Stub implemen private final ReadWriteLock mConfigUpdaterLock = new ReentrantReadWriteLock(); private final Lock mBackgroundServiceLock = new ReentrantLock(); protected ExecutorService mBackgroundLoggingService = Executors.newSingleThreadExecutor(); // NOTE: This is a single-thread executor configured with a ThreadPoolExecutor and a // LinkedBlockingQueue. This ensures that tasks are executed in FIFO (First-In, First-Out) // order, which is crucial for operations like connecting to the configuration service before // other logging activities, and synchronizing queued logging tasks on tracing start and stop. // Configuration: // corePoolSize: 1 (single thread) // maximumPoolSize: 1 (single thread) // keepAliveTime: 0L (threads do not time out) // workQueue: LinkedBlockingQueue (unbounded, FIFO) @VisibleForTesting public final ExecutorService mBackgroundLoggingService = new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>()); // Set to true once this is ready to accept protolog to logcat requests. private boolean mLogcatReady = false; Loading Loading @@ -399,26 +413,7 @@ public abstract class PerfettoProtoLogImpl extends IProtoLogClient.Stub implemen private void onTracingFlush() { Log.d(LOG_TAG, "Executing onTracingFlush"); final ExecutorService loggingService; mBackgroundServiceLock.lock(); try { loggingService = mBackgroundLoggingService; mBackgroundLoggingService = Executors.newSingleThreadExecutor(); } finally { mBackgroundServiceLock.unlock(); } try { loggingService.shutdown(); boolean finished = loggingService.awaitTermination(10, TimeUnit.SECONDS); if (!finished) { Log.e(LOG_TAG, "ProtoLog background tracing service didn't finish gracefully."); } } catch (InterruptedException e) { Log.e(LOG_TAG, "Failed to wait for tracing to finish", e); } waitForExistingBackgroundTasksToComplete(); if (!android.tracing.Flags.clientSideProtoLogging()) { dumpViewerConfig(); Loading Loading @@ -897,6 +892,16 @@ public abstract class PerfettoProtoLogImpl extends IProtoLogClient.Stub implemen } } private void waitForExistingBackgroundTasksToComplete() { try { this.mBackgroundLoggingService.submit(() -> { Log.i(LOG_TAG, "Completed all pending background tasks"); }).get(); } catch (InterruptedException | ExecutionException e) { Log.e(LOG_TAG, "Failed to wait for tracing service background tasks to complete", e); } } /** * This is only used by unit tests to wait until {@link #connectToConfigurationService} is * done. Because unit tests are sensitive to concurrent accesses. Loading @@ -907,13 +912,8 @@ public abstract class PerfettoProtoLogImpl extends IProtoLogClient.Stub implemen if (!(currentInstance instanceof PerfettoProtoLogImpl protoLog)) { return; } try { protoLog.mBackgroundLoggingService.submit(() -> { Log.i(LOG_TAG, "Complete initialization"); }).get(); } catch (InterruptedException | ExecutionException e) { Log.e(LOG_TAG, "Failed to wait for tracing service", e); } protoLog.waitForExistingBackgroundTasksToComplete(); } }
tests/Tracing/src/com/android/internal/protolog/ProcessedPerfettoProtoLogImplTest.java +87 −0 Original line number Diff line number Diff line Loading @@ -68,6 +68,9 @@ import java.io.File; import java.io.IOException; import java.util.List; import java.util.Random; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; /** Loading Loading @@ -953,6 +956,90 @@ public class ProcessedPerfettoProtoLogImplTest { Truth.assertThat(TestProtoLogGroup.TEST_GROUP.isLogToLogcat()).isFalse(); } @Test public void processesAllPendingMessagesBeforeTraceStop() throws IOException, InterruptedException { // large number of messages to log to stress the queue final int numMessages = 1000; final CountDownLatch processingHasStartedLatch = new CountDownLatch(1); final CountDownLatch allowProcessingToContinueLatch = new CountDownLatch(1); final AtomicBoolean blockingTaskStartedExecution = new AtomicBoolean(false); // Configure trace monitor to enable all log levels for the test data source. PerfettoTraceMonitor traceMonitor = PerfettoTraceMonitor.newBuilder() .enableProtoLog(true, List.of(), TEST_PROTOLOG_DATASOURCE_NAME) .build(); try { traceMonitor.start(); assertTrue("ProtoLog should be enabled after starting the trace.", sProtoLog.isProtoEnabled()); // Submit a task that will block the executor queue. sProtoLog.mBackgroundLoggingService.execute(() -> { try { blockingTaskStartedExecution.set(true); processingHasStartedLatch.countDown(); // Signal that this task has started // Wait until the main test thread signals to continue if (!allowProcessingToContinueLatch.await(60, TimeUnit.SECONDS)) { // Fail fast if timeout occurs, to avoid test hanging indefinitely Truth.assertWithMessage( "Timeout waiting for allowProcessingToContinueLatch") .fail(); } } catch (InterruptedException e) { Thread.currentThread().interrupt(); throw new RuntimeException("Blocking task was interrupted: " + e.getMessage()); } }); // Wait for the blocking task to actually start executing on the background thread. // This ensures it's at the head of the executor's queue before we add more tasks. assertTrue("Blocking task did not start execution in time.", processingHasStartedLatch.await(5, TimeUnit.SECONDS)); // Now, submit all the log messages. They will be queued behind the blocking task. for (int i = 0; i < numMessages; i++) { sProtoLog.log(LogLevel.DEBUG, TestProtoLogGroup.TEST_GROUP, 1, LogDataType.BOOLEAN, new Object[]{true}); } // Assert that the blocking task is still active (i.e., waiting on the latch), // which implies the subsequently submitted log messages are still queued. assertTrue("Blocking task should have started execution.", blockingTaskStartedExecution.get()); Truth.assertWithMessage( "allowProcessingToContinueLatch should not have been counted down yet.") .that(allowProcessingToContinueLatch.getCount()) .isEqualTo(1L); // Allow the blocking task to complete. This will allow the executor // to start processing the queued log messages. allowProcessingToContinueLatch.countDown(); // Stop tracing immediately. The implementation should wait for the // mBackgroundLoggingService to process all queued messages (including the // now-unblocked first task and all subsequent log messages). } finally { // Ensure the latch is always counted down if an exception occurred before stop, // or if the test is ending, to prevent the background thread from hanging. if (allowProcessingToContinueLatch.getCount() > 0) { allowProcessingToContinueLatch.countDown(); } traceMonitor.stop(mWriter); } // Verify that all messages were written to the trace. final ResultReader reader = new ResultReader(mWriter.write(), mTraceConfig); final ProtoLogTrace protolog = reader.readProtoLogTrace(); Truth.assertThat(protolog.messages).hasSize(numMessages); for (int i = 0; i < numMessages; i++) { Truth.assertThat(protolog.messages.get(i).getLevel()).isEqualTo(LogLevel.DEBUG); Truth.assertThat(protolog.messages.get(i).getMessage()) .isEqualTo("My Test Debug Log Message true"); } } private enum TestProtoLogGroup implements IProtoLogGroup { TEST_GROUP(true, true, false, "TEST_TAG"); Loading