Donate to e Foundation | Murena handsets with /e/OS | Own a part of Murena! Learn more

Commit 11dd1a77 authored by Pablo Gamito's avatar Pablo Gamito
Browse files

Submit task to queue to sync ProtoLog stop

Instead of stopping and re-creating the executor everytime we can just rely on the FIFO nature of the SingleThreadExecutor to wait for all logs traced up to the onStop call to be flushed

Bug: 416468346
Bug: 410517697
Flag: EXEMPT minor tracing change
Test: atest com.android.internal.protolog.ProcessedPerfettoProtoLogImplTest
Change-Id: Ibc1373daa90bf57dcda53720c2cd1242fec28c9d
parent 6c9e8790
Loading
Loading
Loading
Loading
+30 −30
Original line number Diff line number Diff line
@@ -57,7 +57,6 @@ import android.tracing.perfetto.TracingContext;
import android.util.ArrayMap;
import android.util.ArraySet;
import android.util.Log;
import android.util.LongArray;
import android.util.Slog;
import android.util.proto.ProtoOutputStream;

@@ -82,7 +81,8 @@ import java.util.TreeMap;
import java.util.UUID;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.Lock;
@@ -118,7 +118,21 @@ public abstract class PerfettoProtoLogImpl extends IProtoLogClient.Stub implemen
    private final ReadWriteLock mConfigUpdaterLock = new ReentrantReadWriteLock();

    private final Lock mBackgroundServiceLock = new ReentrantLock();
    protected ExecutorService mBackgroundLoggingService = Executors.newSingleThreadExecutor();

    // NOTE: This is a single-thread executor configured with a ThreadPoolExecutor and a
    // LinkedBlockingQueue. This ensures that tasks are executed in FIFO (First-In, First-Out)
    // order, which is crucial for operations like connecting to the configuration service before
    // other logging activities, and synchronizing queued logging tasks on tracing start and stop.
    // Configuration:
    //   corePoolSize: 1 (single thread)
    //   maximumPoolSize: 1 (single thread)
    //   keepAliveTime: 0L (threads do not time out)
    //   workQueue: LinkedBlockingQueue (unbounded, FIFO)
    @VisibleForTesting
    public final ExecutorService mBackgroundLoggingService =
            new ThreadPoolExecutor(1, 1,
                    0L, TimeUnit.MILLISECONDS,
                    new LinkedBlockingQueue<>());

    // Set to true once this is ready to accept protolog to logcat requests.
    private boolean mLogcatReady = false;
@@ -399,26 +413,7 @@ public abstract class PerfettoProtoLogImpl extends IProtoLogClient.Stub implemen

    private void onTracingFlush() {
        Log.d(LOG_TAG, "Executing onTracingFlush");

        final ExecutorService loggingService;
        mBackgroundServiceLock.lock();
        try {
            loggingService = mBackgroundLoggingService;
            mBackgroundLoggingService = Executors.newSingleThreadExecutor();
        } finally {
            mBackgroundServiceLock.unlock();
        }

        try {
            loggingService.shutdown();
            boolean finished = loggingService.awaitTermination(10, TimeUnit.SECONDS);

            if (!finished) {
                Log.e(LOG_TAG, "ProtoLog background tracing service didn't finish gracefully.");
            }
        } catch (InterruptedException e) {
            Log.e(LOG_TAG, "Failed to wait for tracing to finish", e);
        }
        waitForExistingBackgroundTasksToComplete();

        if (!android.tracing.Flags.clientSideProtoLogging()) {
            dumpViewerConfig();
@@ -897,6 +892,16 @@ public abstract class PerfettoProtoLogImpl extends IProtoLogClient.Stub implemen
        }
    }

    private void waitForExistingBackgroundTasksToComplete() {
        try {
            this.mBackgroundLoggingService.submit(() -> {
                Log.i(LOG_TAG, "Completed all pending background tasks");
            }).get();
        } catch (InterruptedException | ExecutionException e) {
            Log.e(LOG_TAG, "Failed to wait for tracing service background tasks to complete", e);
        }
    }

    /**
     * This is only used by unit tests to wait until {@link #connectToConfigurationService} is
     * done. Because unit tests are sensitive to concurrent accesses.
@@ -907,13 +912,8 @@ public abstract class PerfettoProtoLogImpl extends IProtoLogClient.Stub implemen
        if (!(currentInstance instanceof PerfettoProtoLogImpl protoLog)) {
            return;
        }
        try {
            protoLog.mBackgroundLoggingService.submit(() -> {
                Log.i(LOG_TAG, "Complete initialization");
            }).get();
        } catch (InterruptedException | ExecutionException e) {
            Log.e(LOG_TAG, "Failed to wait for tracing service", e);
        }

        protoLog.waitForExistingBackgroundTasksToComplete();
    }
}
+87 −0
Original line number Diff line number Diff line
@@ -68,6 +68,9 @@ import java.io.File;
import java.io.IOException;
import java.util.List;
import java.util.Random;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;

/**
@@ -953,6 +956,90 @@ public class ProcessedPerfettoProtoLogImplTest {
        Truth.assertThat(TestProtoLogGroup.TEST_GROUP.isLogToLogcat()).isFalse();
    }

    @Test
    public void processesAllPendingMessagesBeforeTraceStop()
            throws IOException, InterruptedException {
        // large number of messages to log to stress the queue
        final int numMessages = 1000;
        final CountDownLatch processingHasStartedLatch = new CountDownLatch(1);
        final CountDownLatch allowProcessingToContinueLatch = new CountDownLatch(1);
        final AtomicBoolean blockingTaskStartedExecution = new AtomicBoolean(false);

        // Configure trace monitor to enable all log levels for the test data source.
        PerfettoTraceMonitor traceMonitor = PerfettoTraceMonitor.newBuilder()
                .enableProtoLog(true, List.of(), TEST_PROTOLOG_DATASOURCE_NAME)
                .build();
        try {
            traceMonitor.start();
            assertTrue("ProtoLog should be enabled after starting the trace.",
                    sProtoLog.isProtoEnabled());

            // Submit a task that will block the executor queue.
            sProtoLog.mBackgroundLoggingService.execute(() -> {
                try {
                    blockingTaskStartedExecution.set(true);
                    processingHasStartedLatch.countDown(); // Signal that this task has started
                    // Wait until the main test thread signals to continue
                    if (!allowProcessingToContinueLatch.await(60, TimeUnit.SECONDS)) {
                        // Fail fast if timeout occurs, to avoid test hanging indefinitely
                        Truth.assertWithMessage(
                                "Timeout waiting for allowProcessingToContinueLatch")
                                .fail();
                    }
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                    throw new RuntimeException("Blocking task was interrupted: " + e.getMessage());
                }
            });

            // Wait for the blocking task to actually start executing on the background thread.
            // This ensures it's at the head of the executor's queue before we add more tasks.
            assertTrue("Blocking task did not start execution in time.",
                    processingHasStartedLatch.await(5, TimeUnit.SECONDS));

            // Now, submit all the log messages. They will be queued behind the blocking task.
            for (int i = 0; i < numMessages; i++) {
                sProtoLog.log(LogLevel.DEBUG, TestProtoLogGroup.TEST_GROUP, 1,
                        LogDataType.BOOLEAN, new Object[]{true});
            }

            // Assert that the blocking task is still active (i.e., waiting on the latch),
            // which implies the subsequently submitted log messages are still queued.
            assertTrue("Blocking task should have started execution.",
                    blockingTaskStartedExecution.get());
            Truth.assertWithMessage(
                    "allowProcessingToContinueLatch should not have been counted down yet.")
                    .that(allowProcessingToContinueLatch.getCount())
                    .isEqualTo(1L);

            // Allow the blocking task to complete. This will allow the executor
            // to start processing the queued log messages.
            allowProcessingToContinueLatch.countDown();

            // Stop tracing immediately. The implementation should wait for the
            // mBackgroundLoggingService to process all queued messages (including the
            // now-unblocked first task and all subsequent log messages).
        } finally {
            // Ensure the latch is always counted down if an exception occurred before stop,
            // or if the test is ending, to prevent the background thread from hanging.
            if (allowProcessingToContinueLatch.getCount() > 0) {
                allowProcessingToContinueLatch.countDown();
            }
            traceMonitor.stop(mWriter);
        }

        // Verify that all messages were written to the trace.
        final ResultReader reader = new ResultReader(mWriter.write(), mTraceConfig);
        final ProtoLogTrace protolog = reader.readProtoLogTrace();

        Truth.assertThat(protolog.messages).hasSize(numMessages);
        for (int i = 0; i < numMessages; i++) {
            Truth.assertThat(protolog.messages.get(i).getLevel()).isEqualTo(LogLevel.DEBUG);
            Truth.assertThat(protolog.messages.get(i).getMessage())
                    .isEqualTo("My Test Debug Log Message true");
        }
    }

    private enum TestProtoLogGroup implements IProtoLogGroup {
        TEST_GROUP(true, true, false, "TEST_TAG");