Donate to e Foundation | Murena handsets with /e/OS | Own a part of Murena! Learn more

Commit f41560b8 authored by Jeff Sharkey's avatar Jeff Sharkey
Browse files

BroadcastQueue: pre-calculate blocked counts.

In an upcoming change we're going to try relaxing setDeliveryState()
to only invalidateRunnableAt() for processes that have just been
unblocked by the operation that just finished.

To enable that shift, we'll need blockedUntilTerminalCount access
at the BroadcastRecord level, so this change shifts to calculating
it when we create the BroadcastRecord; this also saves an extra
traversal of the receivers list to derive the "prioritized" boolean.

Bug: 253906105
Test: atest FrameworksMockingServicesTests:BroadcastRecordTest
Test: atest FrameworksMockingServicesTests:BroadcastQueueTest
Test: atest FrameworksMockingServicesTests:BroadcastQueueModernImplTest
Change-Id: Ie689360d4f2c7af7d936259d9dfa05f663f450a8
parent b84c3cbe
Loading
Loading
Loading
Loading
+12 −28
Original line number Diff line number Diff line
@@ -134,14 +134,6 @@ class BroadcastProcessQueue {
     */
    private int mActiveIndex;

    /**
     * When defined, the receiver actively being dispatched into this process
     * was considered "blocked" until at least the given count of other
     * receivers have reached a terminal state; typically used for ordered
     * broadcasts and priority traunches.
     */
    private int mActiveBlockedUntilTerminalCount;

    /**
     * Count of {@link #mActive} broadcasts that have been dispatched since this
     * queue was last idle.
@@ -206,15 +198,11 @@ class BroadcastProcessQueue {
     * given count of other receivers have reached a terminal state; typically
     * used for ordered broadcasts and priority traunches.
     */
    public void enqueueOrReplaceBroadcast(@NonNull BroadcastRecord record, int recordIndex,
            int blockedUntilTerminalCount) {
    public void enqueueOrReplaceBroadcast(@NonNull BroadcastRecord record, int recordIndex) {
        if (record.isReplacePending()) {
            boolean didReplace = replaceBroadcastInQueue(mPending, record, recordIndex,
                    blockedUntilTerminalCount)
                    || replaceBroadcastInQueue(mPendingUrgent, record, recordIndex,
                            blockedUntilTerminalCount)
                    || replaceBroadcastInQueue(mPendingOffload, record, recordIndex,
                            blockedUntilTerminalCount);
            boolean didReplace = replaceBroadcastInQueue(mPending, record, recordIndex)
                    || replaceBroadcastInQueue(mPendingUrgent, record, recordIndex)
                    || replaceBroadcastInQueue(mPendingOffload, record, recordIndex);
            if (didReplace) {
                return;
            }
@@ -225,7 +213,6 @@ class BroadcastProcessQueue {
        SomeArgs newBroadcastArgs = SomeArgs.obtain();
        newBroadcastArgs.arg1 = record;
        newBroadcastArgs.argi1 = recordIndex;
        newBroadcastArgs.argi2 = blockedUntilTerminalCount;

        // Cross-broadcast prioritization policy:  some broadcasts might warrant being
        // issued ahead of others that are already pending, for example if this new
@@ -244,7 +231,7 @@ class BroadcastProcessQueue {
     * {@code false} otherwise.
     */
    private boolean replaceBroadcastInQueue(@NonNull ArrayDeque<SomeArgs> queue,
            @NonNull BroadcastRecord record, int recordIndex, int blockedUntilTerminalCount) {
            @NonNull BroadcastRecord record, int recordIndex) {
        final Iterator<SomeArgs> it = queue.descendingIterator();
        final Object receiver = record.receivers.get(recordIndex);
        while (it.hasNext()) {
@@ -259,7 +246,6 @@ class BroadcastProcessQueue {
                // Exact match found; perform in-place swap
                args.arg1 = record;
                args.argi1 = recordIndex;
                args.argi2 = blockedUntilTerminalCount;
                onBroadcastDequeued(testRecord, testRecordIndex);
                onBroadcastEnqueued(record, recordIndex);
                return true;
@@ -411,7 +397,6 @@ class BroadcastProcessQueue {
        final SomeArgs next = removeNextBroadcast();
        mActive = (BroadcastRecord) next.arg1;
        mActiveIndex = next.argi1;
        mActiveBlockedUntilTerminalCount = next.argi2;
        mActiveCountSinceIdle++;
        mActiveViaColdStart = false;
        next.recycle();
@@ -424,7 +409,6 @@ class BroadcastProcessQueue {
    public void makeActiveIdle() {
        mActive = null;
        mActiveIndex = 0;
        mActiveBlockedUntilTerminalCount = -1;
        mActiveCountSinceIdle = 0;
        mActiveViaColdStart = false;
        invalidateRunnableAt();
@@ -705,7 +689,7 @@ class BroadcastProcessQueue {
        if (next != null) {
            final BroadcastRecord r = (BroadcastRecord) next.arg1;
            final int index = next.argi1;
            final int blockedUntilTerminalCount = next.argi2;
            final int blockedUntilTerminalCount = r.blockedUntilTerminalCount[index];
            final long runnableAt = r.enqueueTime;

            // We might be blocked waiting for other receivers to finish,
@@ -871,19 +855,19 @@ class BroadcastProcessQueue {
        pw.println();
        pw.increaseIndent();
        if (mActive != null) {
            dumpRecord("ACTIVE", now, pw, mActive, mActiveIndex, mActiveBlockedUntilTerminalCount);
            dumpRecord("ACTIVE", now, pw, mActive, mActiveIndex);
        }
        for (SomeArgs args : mPendingUrgent) {
            final BroadcastRecord r = (BroadcastRecord) args.arg1;
            dumpRecord("URGENT", now, pw, r, args.argi1, args.argi2);
            dumpRecord("URGENT", now, pw, r, args.argi1);
        }
        for (SomeArgs args : mPending) {
            final BroadcastRecord r = (BroadcastRecord) args.arg1;
            dumpRecord(null, now, pw, r, args.argi1, args.argi2);
            dumpRecord(null, now, pw, r, args.argi1);
        }
        for (SomeArgs args : mPendingOffload) {
            final BroadcastRecord r = (BroadcastRecord) args.arg1;
            dumpRecord("OFFLOAD", now, pw, r, args.argi1, args.argi2);
            dumpRecord("OFFLOAD", now, pw, r, args.argi1);
        }
        pw.decreaseIndent();
        pw.println();
@@ -891,8 +875,7 @@ class BroadcastProcessQueue {

    @NeverCompile
    private void dumpRecord(@Nullable String flavor, @UptimeMillisLong long now,
            @NonNull IndentingPrintWriter pw, @NonNull BroadcastRecord record, int recordIndex,
            int blockedUntilTerminalCount) {
            @NonNull IndentingPrintWriter pw, @NonNull BroadcastRecord record, int recordIndex) {
        TimeUtils.formatDuration(record.enqueueTime, now, pw);
        pw.print(' ');
        pw.println(record.toShortString());
@@ -918,6 +901,7 @@ class BroadcastProcessQueue {
            pw.print(info.activityInfo.name);
        }
        pw.println();
        final int blockedUntilTerminalCount = record.blockedUntilTerminalCount[recordIndex];
        if (blockedUntilTerminalCount != -1) {
            pw.print("    blocked until ");
            pw.print(blockedUntilTerminalCount);
+1 −27
Original line number Diff line number Diff line
@@ -32,7 +32,6 @@ import static com.android.server.am.BroadcastProcessQueue.reasonToString;
import static com.android.server.am.BroadcastProcessQueue.removeFromRunnableList;
import static com.android.server.am.BroadcastRecord.deliveryStateToString;
import static com.android.server.am.BroadcastRecord.getReceiverPackageName;
import static com.android.server.am.BroadcastRecord.getReceiverPriority;
import static com.android.server.am.BroadcastRecord.getReceiverProcessName;
import static com.android.server.am.BroadcastRecord.getReceiverUid;
import static com.android.server.am.BroadcastRecord.isDeliveryStateTerminal;
@@ -592,36 +591,11 @@ class BroadcastQueueModernImpl extends BroadcastQueue {
        r.enqueueRealTime = SystemClock.elapsedRealtime();
        r.enqueueClockTime = System.currentTimeMillis();

        int lastPriority = 0;
        int lastPriorityIndex = 0;

        for (int i = 0; i < r.receivers.size(); i++) {
            final Object receiver = r.receivers.get(i);
            final BroadcastProcessQueue queue = getOrCreateProcessQueue(
                    getReceiverProcessName(receiver), getReceiverUid(receiver));

            final int blockedUntilTerminalCount;
            if (r.ordered) {
                // When sending an ordered broadcast, we need to block this
                // receiver until all previous receivers have terminated
                blockedUntilTerminalCount = i;
            } else if (r.prioritized) {
                // When sending a prioritized broadcast, we only need to wait
                // for the previous traunch of receivers to be terminated
                final int thisPriority = getReceiverPriority(receiver);
                if ((i == 0) || (thisPriority != lastPriority)) {
                    lastPriority = thisPriority;
                    lastPriorityIndex = i;
                    blockedUntilTerminalCount = i;
                } else {
                    blockedUntilTerminalCount = lastPriorityIndex;
                }
            } else {
                // Otherwise we don't need to block at all
                blockedUntilTerminalCount = -1;
            }

            queue.enqueueOrReplaceBroadcast(r, i, blockedUntilTerminalCount);
            queue.enqueueOrReplaceBroadcast(r, i);
            updateRunnableList(queue);
            enqueueUpdateRunningList();
        }
+53 −12
Original line number Diff line number Diff line
@@ -96,6 +96,7 @@ final class BroadcastRecord extends Binder {
    final @Nullable BroadcastOptions options; // BroadcastOptions supplied by caller
    final @NonNull List<Object> receivers;   // contains BroadcastFilter and ResolveInfo
    final @DeliveryState int[] delivery;   // delivery state of each receiver
    final int[] blockedUntilTerminalCount; // blocked until count of each receiver
    @Nullable ProcessRecord resultToApp; // who receives final result if non-null
    @Nullable IIntentReceiver resultTo; // who receives final result if non-null
    boolean deferred;
@@ -375,6 +376,7 @@ final class BroadcastRecord extends Binder {
        options = _options;
        receivers = (_receivers != null) ? _receivers : EMPTY_RECEIVERS;
        delivery = new int[_receivers != null ? _receivers.size() : 0];
        blockedUntilTerminalCount = calculateBlockedUntilTerminalCount(receivers, _serialized);
        scheduledTime = new long[delivery.length];
        terminalTime = new long[delivery.length];
        resultToApp = _resultToApp;
@@ -385,7 +387,7 @@ final class BroadcastRecord extends Binder {
        ordered = _serialized;
        sticky = _sticky;
        initialSticky = _initialSticky;
        prioritized = isPrioritized(receivers);
        prioritized = isPrioritized(blockedUntilTerminalCount, _serialized);
        userId = _userId;
        nextReceiver = 0;
        state = IDLE;
@@ -427,6 +429,7 @@ final class BroadcastRecord extends Binder {
        options = from.options;
        receivers = from.receivers;
        delivery = from.delivery;
        blockedUntilTerminalCount = from.blockedUntilTerminalCount;
        scheduledTime = from.scheduledTime;
        terminalTime = from.terminalTime;
        resultToApp = from.resultToApp;
@@ -690,22 +693,60 @@ final class BroadcastRecord extends Binder {
    }

    /**
     * Return if given receivers list has more than one traunch of priorities.
     * Determine if the result of {@link #calculateBlockedUntilTerminalCount}
     * has prioritized tranches of receivers.
     */
    @VisibleForTesting
    static boolean isPrioritized(@NonNull List<Object> receivers) {
        int firstPriority = 0;
        for (int i = 0; i < receivers.size(); i++) {
    static boolean isPrioritized(@NonNull int[] blockedUntilTerminalCount,
            boolean ordered) {
        return !ordered && (blockedUntilTerminalCount.length > 0)
                && (blockedUntilTerminalCount[0] != -1);
    }

    /**
     * Calculate the {@link #terminalCount} that each receiver should be
     * considered blocked until.
     * <p>
     * For example, in an ordered broadcast, receiver {@code N} is blocked until
     * receiver {@code N-1} reaches a terminal state. Similarly, in a
     * prioritized broadcast, receiver {@code N} is blocked until all receivers
     * of a higher priority reach a terminal state.
     * <p>
     * When there are no terminal count constraints, the blocked value for each
     * receiver is {@code -1}.
     */
    @VisibleForTesting
    static @NonNull int[] calculateBlockedUntilTerminalCount(
            @NonNull List<Object> receivers, boolean ordered) {
        final int N = receivers.size();
        final int[] blockedUntilTerminalCount = new int[N];
        int lastPriority = 0;
        int lastPriorityIndex = 0;
        for (int i = 0; i < N; i++) {
            if (ordered) {
                // When sending an ordered broadcast, we need to block this
                // receiver until all previous receivers have terminated
                blockedUntilTerminalCount[i] = i;
            } else {
                // When sending a prioritized broadcast, we only need to wait
                // for the previous tranche of receivers to be terminated
                final int thisPriority = getReceiverPriority(receivers.get(i));
            if (i == 0) {
                firstPriority = thisPriority;
            } else if (thisPriority != firstPriority) {
                return true;
                if ((i == 0) || (thisPriority != lastPriority)) {
                    lastPriority = thisPriority;
                    lastPriorityIndex = i;
                    blockedUntilTerminalCount[i] = i;
                } else {
                    blockedUntilTerminalCount[i] = lastPriorityIndex;
                }
            }
        return false;
        }

        // If the entire list is in the same priority tranche, mark as -1 to
        // indicate that none of them need to wait
        if (N > 0 && blockedUntilTerminalCount[N - 1] == 0) {
            Arrays.fill(blockedUntilTerminalCount, -1);
        }
        return blockedUntilTerminalCount;
    }

    static int getReceiverUid(@NonNull Object receiver) {
        if (receiver instanceof BroadcastFilter) {
+8 −7
Original line number Diff line number Diff line
@@ -25,6 +25,7 @@ import static com.android.server.am.BroadcastQueueTest.PACKAGE_RED;
import static com.android.server.am.BroadcastQueueTest.PACKAGE_YELLOW;
import static com.android.server.am.BroadcastQueueTest.getUidForPackage;
import static com.android.server.am.BroadcastQueueTest.makeManifestReceiver;
import static com.android.server.am.BroadcastQueueTest.withPriority;

import static com.google.common.truth.Truth.assertThat;

@@ -283,7 +284,7 @@ public class BroadcastQueueModernImplTest {

        final Intent airplane = new Intent(Intent.ACTION_AIRPLANE_MODE_CHANGED);
        final BroadcastRecord airplaneRecord = makeBroadcastRecord(airplane);
        queue.enqueueOrReplaceBroadcast(airplaneRecord, 0, 0);
        queue.enqueueOrReplaceBroadcast(airplaneRecord, 0);

        queue.setProcessCached(false);
        final long notCachedRunnableAt = queue.getRunnableAt();
@@ -305,12 +306,12 @@ public class BroadcastQueueModernImplTest {
        // enqueue a bg-priority broadcast then a fg-priority one
        final Intent timezone = new Intent(Intent.ACTION_TIMEZONE_CHANGED);
        final BroadcastRecord timezoneRecord = makeBroadcastRecord(timezone);
        queue.enqueueOrReplaceBroadcast(timezoneRecord, 0, 0);
        queue.enqueueOrReplaceBroadcast(timezoneRecord, 0);

        final Intent airplane = new Intent(Intent.ACTION_AIRPLANE_MODE_CHANGED);
        airplane.addFlags(Intent.FLAG_RECEIVER_FOREGROUND);
        final BroadcastRecord airplaneRecord = makeBroadcastRecord(airplane);
        queue.enqueueOrReplaceBroadcast(airplaneRecord, 0, 0);
        queue.enqueueOrReplaceBroadcast(airplaneRecord, 0);

        // verify that:
        // (a) the queue is immediately runnable by existence of a fg-priority broadcast
@@ -339,9 +340,9 @@ public class BroadcastQueueModernImplTest {

        final Intent airplane = new Intent(Intent.ACTION_AIRPLANE_MODE_CHANGED);
        final BroadcastRecord airplaneRecord = makeBroadcastRecord(airplane, null,
                List.of(makeManifestReceiver(PACKAGE_GREEN, CLASS_GREEN),
                        makeManifestReceiver(PACKAGE_GREEN, CLASS_GREEN)), true);
        queue.enqueueOrReplaceBroadcast(airplaneRecord, 1, 1);
                List.of(withPriority(makeManifestReceiver(PACKAGE_GREEN, CLASS_GREEN), 10),
                        withPriority(makeManifestReceiver(PACKAGE_GREEN, CLASS_GREEN), 0)), true);
        queue.enqueueOrReplaceBroadcast(airplaneRecord, 1);

        assertFalse(queue.isRunnable());
        assertEquals(BroadcastProcessQueue.REASON_BLOCKED, queue.getRunnableAtReason());
@@ -363,7 +364,7 @@ public class BroadcastQueueModernImplTest {

        final Intent airplane = new Intent(Intent.ACTION_AIRPLANE_MODE_CHANGED);
        final BroadcastRecord airplaneRecord = makeBroadcastRecord(airplane);
        queue.enqueueOrReplaceBroadcast(airplaneRecord, 0, 0);
        queue.enqueueOrReplaceBroadcast(airplaneRecord, 0);

        mConstants.MAX_PENDING_BROADCASTS = 128;
        queue.invalidateRunnableAt();
+5 −0
Original line number Diff line number Diff line
@@ -503,6 +503,11 @@ public class BroadcastQueueTest {
        return ai;
    }

    static ResolveInfo withPriority(ResolveInfo info, int priority) {
        info.priority = priority;
        return info;
    }

    static ResolveInfo makeManifestReceiver(String packageName, String name) {
        return makeManifestReceiver(packageName, name, UserHandle.USER_SYSTEM);
    }
Loading