Donate to e Foundation | Murena handsets with /e/OS | Own a part of Murena! Learn more

Commit 4493eb69 authored by Kevin Jeon's avatar Kevin Jeon
Browse files

Add benchmark for concurrent enqueues and removes

This test enqueues message on one thread before removing those messages
from another thread. Remove-threads wait for their paired
enqueue-threads to complete before beginning. These enqueue-remove
thread pairs run concurrently.

Test: atest MessageQueuePerfTest
Bug: 422736500
Flag: EXEMPT benchmark
Change-Id: I2ac1218ff7a8e28ae413b90716d7523626002881
parent f7d85cbc
Loading
Loading
Loading
Loading
+169 −20
Original line number Diff line number Diff line
@@ -32,15 +32,15 @@ import androidx.test.InstrumentationRegistry;
import androidx.test.filters.LargeTest;
import androidx.test.runner.AndroidJUnit4;

import java.util.ArrayList;
import java.util.concurrent.CountDownLatch;
import java.util.Random;

import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;

import java.util.ArrayList;
import java.util.Random;
import java.util.concurrent.CountDownLatch;

/**
 * Performance tests for {@link MessageQueue}.
 */
@@ -53,9 +53,6 @@ public class MessageQueuePerfTest {
    private static final int TOTAL_MESSAGE_COUNT = PER_THREAD_MESSAGE_COUNT * THREAD_COUNT;
    private static final int DEFAULT_MESSAGE_WHAT = 2;

    static Object sLock = new Object();
    private ArrayList<Long> mResults;

    @Before
    public void setUp() {
        mHandlerThread = new HandlerThread("MessageQueuePerfTest");
@@ -67,13 +64,14 @@ public class MessageQueuePerfTest {
        mHandlerThread.quitSafely();
    }

    class EnqueueThread extends Thread {
    static class EnqueueThread extends Thread {
        CountDownLatch mStartLatch;
        CountDownLatch mEndLatch;
        Handler mHandler;
        int mMessageStartIdx;
        Message[] mMessages;
        long[] mDelays;
        ArrayList<Long> mResults;

        EnqueueThread(CountDownLatch startLatch, CountDownLatch endLatch, Handler handler,
                int startIdx, Message[] messages, long[] delays) {
@@ -84,6 +82,7 @@ public class MessageQueuePerfTest {
            mMessageStartIdx = startIdx;
            mMessages = messages;
            mDelays = delays;
            mResults = new ArrayList<>();
        }

        @Override
@@ -92,7 +91,7 @@ public class MessageQueuePerfTest {
            try {
                mStartLatch.await();
            } catch (InterruptedException e) {

                Thread.currentThread().interrupt();
            }
            long now = SystemClock.uptimeMillis();
            long startTimeNS = SystemClock.elapsedRealtimeNanos();
@@ -105,11 +104,52 @@ public class MessageQueuePerfTest {
            }
            long endTimeNS = SystemClock.elapsedRealtimeNanos();

            synchronized (sLock) {
            mResults.add(endTimeNS - startTimeNS);
            mEndLatch.countDown();
        }
    }

    static class RemoveThread extends Thread {
        CountDownLatch mStartLatch;
        CountDownLatch mEndLatch;
        Handler mHandler;
        Thread mBlockingThread;
        int mWhat;
        ArrayList<Long> mResults;

        RemoveThread(CountDownLatch startLatch, CountDownLatch endLatch, Handler handler,
                Thread blockingThread, int what) {
            super();
            mStartLatch = startLatch;
            mEndLatch = endLatch;
            mHandler = handler;
            mBlockingThread = blockingThread;
            mWhat = what;
            mResults = new ArrayList<>();
        }

        @Override
        public void run() {
            Log.d(TAG, "Remove thread started for message " + mWhat);
            try {
                mStartLatch.await();
                // We wait for mBlockingThread to complete in case it is still in the process of
                // enqueuing messages, to ensure that the expected number of messages is removed.
                if (mBlockingThread != null) {
                    mBlockingThread.join();
                }
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
            long now = SystemClock.uptimeMillis();
            long startTimeNS = SystemClock.elapsedRealtimeNanos();
            mHandler.removeMessages(mWhat);
            long endTimeNS = SystemClock.elapsedRealtimeNanos();

            mResults.add(endTimeNS - startTimeNS);
            mEndLatch.countDown();
        }

    }

    class TestHandler extends Handler {
@@ -120,9 +160,24 @@ public class MessageQueuePerfTest {
        public void handleMessage(Message msg) { }
    }

    void reportPerf(String prefix, int threadCount, int perThreadMessageCount) {
    void reportPerf(String prefix, int threadCount, int perThreadMessageCount,
            EnqueueThread[] enqueueThreads, RemoveThread[] removeThreads) {
        Instrumentation instrumentation = InstrumentationRegistry.getInstrumentation();
        Stats stats = new Stats(mResults);

        // Accumulate enqueue/remove results.
        ArrayList<Long> enqueueResults = new ArrayList<>();
        for (EnqueueThread thread : enqueueThreads) {
            enqueueResults.addAll(thread.mResults);
        }
        Stats stats = new Stats(enqueueResults);

        ArrayList<Long> removeResults = new ArrayList<>();
        if (removeThreads != null) {
            for (RemoveThread thread : removeThreads) {
                removeResults.addAll(thread.mResults);
            }
        }
        Stats removeStats = (removeResults.size() > 0) ? new Stats(removeResults) : null;

        Log.d(TAG, "Reporting perf now");

@@ -132,6 +187,13 @@ public class MessageQueuePerfTest {
        status.putLong(prefix + "_min_ns", stats.getMin());
        status.putLong(prefix + "_max_ns", stats.getMax());
        status.putLong(prefix + "_stddev_ns", (long) stats.getStandardDeviation());
        if (removeStats != null) {
            status.putLong(prefix + "_remove_median_ns", removeStats.getMedian());
            status.putLong(prefix + "_remove_mean_ns", (long) removeStats.getMean());
            status.putLong(prefix + "_remove_min_ns", removeStats.getMin());
            status.putLong(prefix + "_remove_max_ns", removeStats.getMax());
            status.putLong(prefix + "_remove_stddev_ns", (long) removeStats.getStandardDeviation());
        }
        status.putLong(prefix + "_nr_threads", threadCount);
        status.putLong(prefix + "_msgs_per_thread", perThreadMessageCount);
        instrumentation.sendStatus(Activity.RESULT_OK, status);
@@ -139,22 +201,33 @@ public class MessageQueuePerfTest {

    HandlerThread mHandlerThread;

    private void fillMessagesArray(Message[] messages) {
        for (int i = 0; i < messages.length; i++) {
            messages[i] = mHandlerThread.getThreadHandler().obtainMessage(DEFAULT_MESSAGE_WHAT);
    private void fillMessagesArray(Message[] messages, int what, int startIdx, int endIdx) {
        for (int i = startIdx; i < endIdx; i++) {
            messages[i] = mHandlerThread.getThreadHandler().obtainMessage(what);
        }
    }

    private void fillMessagesArray(Message[] messages) {
        fillMessagesArray(messages, DEFAULT_MESSAGE_WHAT, 0, messages.length);
    }

    private void startTestAndWaitOnThreads(CountDownLatch threadStartLatch, CountDownLatch threadEndLatch) {
        try {
            threadStartLatch.countDown();
            Log.e(TAG, "Test threads started");
            threadEndLatch.await();
        } catch (InterruptedException ignored) {
            Thread.currentThread().interrupt();
        }
        Log.e(TAG, "Test threads ended, quitting handler thread");
    }

    /**
     * Benchmark for enqueueing messages at the front of the message queue.
     *
     * <p> This benchmark adds messages to the front of the message queue from multiple threads. It
     * measures the latency of enqueue operations.
     */
    @Test
    public void benchmarkEnqueueAtFrontOfQueue() {
        CountDownLatch threadStartLatch = new CountDownLatch(1);
@@ -163,18 +236,19 @@ public class MessageQueuePerfTest {
        fillMessagesArray(messages);

        long[] delays = new long[TOTAL_MESSAGE_COUNT];
        mResults = new ArrayList<>();

        TestHandler handler = new TestHandler(mHandlerThread.getLooper());
        EnqueueThread[] enqueueThreads = new EnqueueThread[THREAD_COUNT];
        for (int i = 0; i < THREAD_COUNT; i++) {
            EnqueueThread thread = new EnqueueThread(threadStartLatch, threadEndLatch, handler,
                    i * PER_THREAD_MESSAGE_COUNT, messages, delays);
            enqueueThreads[i] = thread;
            thread.start();
        }

        startTestAndWaitOnThreads(threadStartLatch, threadEndLatch);

        reportPerf("enqueueAtFront", THREAD_COUNT, PER_THREAD_MESSAGE_COUNT);
        reportPerf("enqueueAtFront", THREAD_COUNT, PER_THREAD_MESSAGE_COUNT, enqueueThreads, null);
    }

    /**
@@ -189,6 +263,12 @@ public class MessageQueuePerfTest {
        return delays;
    }

    /**
     * Benchmark for enqueuing delayed messages to the message queue.
     *
     * <p> This benchmark adds messages at random points in the message queue from multiple threads.
     * It measures the latency of enqueue operations.
     */
    @Test
    public void benchmarkEnqueueDelayed() {
        CountDownLatch threadStartLatch = new CountDownLatch(1);
@@ -197,20 +277,89 @@ public class MessageQueuePerfTest {
        fillMessagesArray(messages);

        long[] delays = fillDelayArray();
        mResults = new ArrayList<>();

        TestHandler handler = new TestHandler(mHandlerThread.getLooper());
        EnqueueThread[] enqueueThreads = new EnqueueThread[THREAD_COUNT];
        for (int i = 0; i < THREAD_COUNT; i++) {
            EnqueueThread thread = new EnqueueThread(threadStartLatch, threadEndLatch, handler,
                    i * PER_THREAD_MESSAGE_COUNT, messages, delays);
            enqueueThreads[i] = thread;
            thread.start();
        }

        startTestAndWaitOnThreads(threadStartLatch, threadEndLatch);

        reportPerf("enqueueDelayed", THREAD_COUNT, PER_THREAD_MESSAGE_COUNT);
        reportPerf("enqueueDelayed", THREAD_COUNT, PER_THREAD_MESSAGE_COUNT, enqueueThreads, null);
    }

    /**
     * Benchmark for enqueuing delayed messages and removing them from the message queue.
     *
     * <p> This benchmark adds messages at random points in the message queue from multiple threads,
     * with each thread enqueuing messages with a different 'what' field. After a thread has
     * completed adding its messages, another thread removes them. This measures the latency of
     * enqueue and remove operations.
     */
    @Test
    public void benchmarkConcurrentEnqueueDelayedAndRemove() {
        // taskThreadCount threads are used for both enqueuing and removing.
        final int taskThreadCount = THREAD_COUNT / 2;
        final int messageCount = taskThreadCount * PER_THREAD_MESSAGE_COUNT;

        // We use taskThreadCount * 2 in case THREAD_COUNT is not an even number.
        CountDownLatch threadStartLatch = new CountDownLatch(1);
        CountDownLatch threadEndLatch  = new CountDownLatch(taskThreadCount * 2);

        long[] delays = fillDelayArray();
        TestHandler handler = new TestHandler(mHandlerThread.getLooper());

        // Fill with taskThreadCount blocks of PER_THREAD_MESSAGE_COUNT messages.
        Message[] messages = new Message[messageCount];
        for (int i = 0; i < taskThreadCount; i++) {
            fillMessagesArray(messages,
                    /* what = */ i, /* startIdx = */ i * PER_THREAD_MESSAGE_COUNT,
                    /* endIdx = */ (i + 1) * PER_THREAD_MESSAGE_COUNT);
        }

        EnqueueThread[] enqueueThreads = new EnqueueThread[taskThreadCount];
        RemoveThread[] removeThreads = new RemoveThread[taskThreadCount];

        // Start by enqueuing the first block of messages.
        enqueueThreads[0] = new EnqueueThread(threadStartLatch, threadEndLatch, handler,
                /* startIdx = */ 0, messages, delays);
        enqueueThreads[0].start();

        for (int i = 1; i < taskThreadCount; i++) {
            // Remove messages from the corresponding enqueue thread from the previous iteration.
            removeThreads[i - 1] = new RemoveThread(
                    threadStartLatch, threadEndLatch, handler, enqueueThreads[i - 1],
                    /* what = */ i - 1);
            removeThreads[i - 1].start();

            // Concurrently enqueue the next set of messages.
            enqueueThreads[i] = new EnqueueThread(threadStartLatch, threadEndLatch,
                    handler, i * PER_THREAD_MESSAGE_COUNT, messages, delays);
            enqueueThreads[i].start();
        }

        // End by removing the last block of messages.
        removeThreads[taskThreadCount - 1] = new RemoveThread(
                threadStartLatch, threadEndLatch, handler, enqueueThreads[taskThreadCount - 1],
                /* what = */ taskThreadCount - 1);
        removeThreads[taskThreadCount - 1].start();

        startTestAndWaitOnThreads(threadStartLatch, threadEndLatch);

        reportPerf("concurrentEnqueueDelayedAndRemove", THREAD_COUNT, PER_THREAD_MESSAGE_COUNT,
                enqueueThreads, removeThreads);
    }

    /**
     * Benchmark for enqueueing and removing messages from a single thread.
     *
     * <p> This benchmark measures the time it takes to enqueue a message, then remove it. This is
     * repeated multiple times.
     */
    @Test
    public void benchmarkSingleThreadedEnqueueAndRemove() throws InterruptedException {
        final CountDownLatch threadEndLatch  = new CountDownLatch(1);