Donate to e Foundation | Murena handsets with /e/OS | Own a part of Murena! Learn more

Commit 297621c0 authored by Jeff Sharkey's avatar Jeff Sharkey
Browse files

BroadcastQueue: add support for priority tranches.

Both manifest and registered receivers can specify a "priority" in
which they'd like to be delivered.  This can be used to ensure that
system components receieve a broadcast (and can update their internal
state) before third-party apps receive that broadcast.

The "default" stack handled this by dispatching all receivers in
the order defined by ActivityManagerService, but the "modern" stack
now needs to carefully only allow parallel dispatch within a specific
tranche of consistent priority.  (That is, we must finish dispatching
all receivers with priority "10" before moving on to priority "9".)

This change repurposes the logic we built for ordered broadcasts,
where a process queue is "blocked" while waiting on a certain number
of prior broadcasts to reach a terminal state.

To avoid priority inversions (where a lower priority receiver would be
blocked behind a higher priority receiver in a frozen state), we
currently promote all receivers waiting on a "prioritized" broadcast
to be immediately runnable.

Bug: 245771249
Test: atest FrameworksMockingServicesTests:BroadcastQueueTest
Test: atest FrameworksMockingServicesTests:BroadcastQueueModernImplTest
Change-Id: Ic6ff9a959a262bd73ecd3d7977f400d10b81535d
parent d82a8d2f
Loading
Loading
Loading
Loading
+49 −16
Original line number Diff line number Diff line
@@ -17,6 +17,7 @@
package com.android.server.am;

import static com.android.server.am.BroadcastRecord.deliveryStateToString;
import static com.android.server.am.BroadcastRecord.isDeliveryStateTerminal;

import android.annotation.IntDef;
import android.annotation.NonNull;
@@ -118,6 +119,7 @@ class BroadcastProcessQueue {
    private int mCountForeground;
    private int mCountOrdered;
    private int mCountAlarm;
    private int mCountPrioritized;

    private @UptimeMillisLong long mRunnableAt = Long.MAX_VALUE;
    private @Reason int mRunnableAtReason = REASON_EMPTY;
@@ -139,8 +141,13 @@ class BroadcastProcessQueue {
     * Enqueue the given broadcast to be dispatched to this process at some
     * future point in time. The target receiver is indicated by the given index
     * into {@link BroadcastRecord#receivers}.
     * <p>
     * When defined, this receiver is considered "blocked" until at least the
     * given count of other receivers have reached a terminal state; typically
     * used for ordered broadcasts and priority traunches.
     */
    public void enqueueBroadcast(@NonNull BroadcastRecord record, int recordIndex) {
    public void enqueueBroadcast(@NonNull BroadcastRecord record, int recordIndex,
            int blockedUntilTerminalCount) {
        // Detect situations where the incoming broadcast should cause us to
        // recalculate when we'll be runnable
        if (mPending.isEmpty()) {
@@ -158,9 +165,14 @@ class BroadcastProcessQueue {
            mCountAlarm++;
            invalidateRunnableAt();
        }
        if (record.prioritized) {
            mCountPrioritized++;
            invalidateRunnableAt();
        }
        SomeArgs args = SomeArgs.obtain();
        args.arg1 = record;
        args.argi1 = recordIndex;
        args.argi2 = blockedUntilTerminalCount;
        mPending.addLast(args);
    }

@@ -236,8 +248,10 @@ class BroadcastProcessQueue {
                && (mActive.isForeground() || mActive.ordered || mActive.alarm)) {
            // We have an important broadcast right now, so boost priority
            return ProcessList.SCHED_GROUP_DEFAULT;
        } else {
        } else if (!isIdle()) {
            return ProcessList.SCHED_GROUP_BACKGROUND;
        } else {
            return ProcessList.SCHED_GROUP_UNDEFINED;
        }
    }

@@ -277,6 +291,9 @@ class BroadcastProcessQueue {
        if (mActive.alarm) {
            mCountAlarm--;
        }
        if (mActive.prioritized) {
            mCountPrioritized--;
        }
        invalidateRunnableAt();
    }

@@ -342,6 +359,14 @@ class BroadcastProcessQueue {
        return mActive != null;
    }

    /**
     * Quickly determine if this queue has broadcasts that are still waiting to
     * be delivered at some point in the future.
     */
    public boolean isIdle() {
        return !isActive() && isEmpty();
    }

    public boolean isRunnable() {
        if (mRunnableAtInvalidated) updateRunnableAt();
        return mRunnableAt != Long.MAX_VALUE;
@@ -374,24 +399,26 @@ class BroadcastProcessQueue {
        mRunnableAtInvalidated = true;
    }

    private static final int REASON_EMPTY = 0;
    private static final int REASON_CONTAINS_FOREGROUND = 1;
    private static final int REASON_CONTAINS_ORDERED = 2;
    private static final int REASON_CONTAINS_ALARM = 3;
    private static final int REASON_CACHED = 4;
    private static final int REASON_NORMAL = 5;
    private static final int REASON_MAX_PENDING = 6;
    private static final int REASON_BLOCKED_ORDERED = 7;
    static final int REASON_EMPTY = 0;
    static final int REASON_CONTAINS_FOREGROUND = 1;
    static final int REASON_CONTAINS_ORDERED = 2;
    static final int REASON_CONTAINS_ALARM = 3;
    static final int REASON_CONTAINS_PRIORITIZED = 4;
    static final int REASON_CACHED = 5;
    static final int REASON_NORMAL = 6;
    static final int REASON_MAX_PENDING = 7;
    static final int REASON_BLOCKED = 8;

    @IntDef(flag = false, prefix = { "REASON_" }, value = {
            REASON_EMPTY,
            REASON_CONTAINS_FOREGROUND,
            REASON_CONTAINS_ORDERED,
            REASON_CONTAINS_ALARM,
            REASON_CONTAINS_PRIORITIZED,
            REASON_CACHED,
            REASON_NORMAL,
            REASON_MAX_PENDING,
            REASON_BLOCKED_ORDERED,
            REASON_BLOCKED,
    })
    @Retention(RetentionPolicy.SOURCE)
    public @interface Reason {}
@@ -402,10 +429,11 @@ class BroadcastProcessQueue {
            case REASON_CONTAINS_FOREGROUND: return "CONTAINS_FOREGROUND";
            case REASON_CONTAINS_ORDERED: return "CONTAINS_ORDERED";
            case REASON_CONTAINS_ALARM: return "CONTAINS_ALARM";
            case REASON_CONTAINS_PRIORITIZED: return "CONTAINS_PRIORITIZED";
            case REASON_CACHED: return "CACHED";
            case REASON_NORMAL: return "NORMAL";
            case REASON_MAX_PENDING: return "MAX_PENDING";
            case REASON_BLOCKED_ORDERED: return "BLOCKED_ORDERED";
            case REASON_BLOCKED: return "BLOCKED";
            default: return Integer.toString(reason);
        }
    }
@@ -418,13 +446,15 @@ class BroadcastProcessQueue {
        if (next != null) {
            final BroadcastRecord r = (BroadcastRecord) next.arg1;
            final int index = next.argi1;
            final int blockedUntilTerminalCount = next.argi2;
            final long runnableAt = r.enqueueTime;

            // If our next broadcast is ordered, and we're not the next receiver
            // in line, then we're not runnable at all
            if (r.ordered && r.finishedCount != index) {
            // We might be blocked waiting for other receivers to finish,
            // typically for an ordered broadcast or priority traunches
            if (r.terminalCount < blockedUntilTerminalCount
                    && !isDeliveryStateTerminal(r.getDeliveryState(index))) {
                mRunnableAt = Long.MAX_VALUE;
                mRunnableAtReason = REASON_BLOCKED_ORDERED;
                mRunnableAtReason = REASON_BLOCKED;
                return;
            }

@@ -445,6 +475,9 @@ class BroadcastProcessQueue {
            } else if (mCountAlarm > 0) {
                mRunnableAt = runnableAt;
                mRunnableAtReason = REASON_CONTAINS_ALARM;
            } else if (mCountPrioritized > 0) {
                mRunnableAt = runnableAt;
                mRunnableAtReason = REASON_CONTAINS_PRIORITIZED;
            } else if (mProcessCached) {
                mRunnableAt = runnableAt + constants.DELAY_CACHED_MILLIS;
                mRunnableAtReason = REASON_CACHED;
+51 −18
Original line number Diff line number Diff line
@@ -31,6 +31,7 @@ import static com.android.server.am.BroadcastProcessQueue.reasonToString;
import static com.android.server.am.BroadcastProcessQueue.removeFromRunnableList;
import static com.android.server.am.BroadcastRecord.deliveryStateToString;
import static com.android.server.am.BroadcastRecord.getReceiverPackageName;
import static com.android.server.am.BroadcastRecord.getReceiverPriority;
import static com.android.server.am.BroadcastRecord.getReceiverProcessName;
import static com.android.server.am.BroadcastRecord.getReceiverUid;
import static com.android.server.am.BroadcastRecord.isDeliveryStateTerminal;
@@ -500,11 +501,36 @@ class BroadcastQueueModernImpl extends BroadcastQueue {
        r.enqueueRealTime = SystemClock.elapsedRealtime();
        r.enqueueClockTime = System.currentTimeMillis();

        int lastPriority = 0;
        int lastPriorityIndex = 0;

        for (int i = 0; i < r.receivers.size(); i++) {
            final Object receiver = r.receivers.get(i);
            final BroadcastProcessQueue queue = getOrCreateProcessQueue(
                    getReceiverProcessName(receiver), getReceiverUid(receiver));
            queue.enqueueBroadcast(r, i);

            final int blockedUntilTerminalCount;
            if (r.ordered) {
                // When sending an ordered broadcast, we need to block this
                // receiver until all previous receivers have terminated
                blockedUntilTerminalCount = i;
            } else if (r.prioritized) {
                // When sending a prioritized broadcast, we only need to wait
                // for the previous traunch of receivers to be terminated
                final int thisPriority = getReceiverPriority(receiver);
                if ((i == 0) || (thisPriority != lastPriority)) {
                    lastPriority = thisPriority;
                    lastPriorityIndex = i;
                    blockedUntilTerminalCount = i;
                } else {
                    blockedUntilTerminalCount = lastPriorityIndex;
                }
            } else {
                // Otherwise we don't need to block at all
                blockedUntilTerminalCount = 0;
            }

            queue.enqueueBroadcast(r, i, blockedUntilTerminalCount);
            updateRunnableList(queue);
            enqueueUpdateRunningList();
        }
@@ -579,7 +605,7 @@ class BroadcastQueueModernImpl extends BroadcastQueue {
        final int index = queue.getActiveIndex();
        final Object receiver = r.receivers.get(index);

        if (r.finishedCount == 0) {
        if (r.terminalCount == 0) {
            r.dispatchTime = SystemClock.uptimeMillis();
            r.dispatchRealTime = SystemClock.elapsedRealtime();
            r.dispatchClockTime = System.currentTimeMillis();
@@ -720,7 +746,7 @@ class BroadcastQueueModernImpl extends BroadcastQueue {
        // When the caller aborted an ordered broadcast, we mark all remaining
        // receivers as skipped
        if (r.ordered && r.resultAbort) {
            for (int i = r.finishedCount + 1; i < r.receivers.size(); i++) {
            for (int i = r.terminalCount + 1; i < r.receivers.size(); i++) {
                setDeliveryState(null, null, r, i, r.receivers.get(i),
                        BroadcastRecord.DELIVERY_SKIPPED);
            }
@@ -814,23 +840,30 @@ class BroadcastQueueModernImpl extends BroadcastQueue {
                        + deliveryStateToString(newDeliveryState));
            }

            r.finishedCount++;
            r.terminalCount++;
            notifyFinishReceiver(queue, r, index, receiver);

            if (r.ordered) {
                if (r.finishedCount < r.receivers.size()) {
                    // We just finished an ordered receiver, which means the
                    // next receiver might now be runnable
                    final Object nextReceiver = r.receivers.get(r.finishedCount);
                    final BroadcastProcessQueue nextQueue = getProcessQueue(
                            getReceiverProcessName(nextReceiver), getReceiverUid(nextReceiver));
                    nextQueue.invalidateRunnableAt();
                    updateRunnableList(nextQueue);
                    enqueueUpdateRunningList();
                } else {
                    // Everything finished, so deliver final result
            // When entire ordered broadcast finished, deliver final result
            if (r.ordered && (r.terminalCount == r.receivers.size())) {
                scheduleResultTo(r);
            }

            // Our terminal state here might be enough for another process
            // blocked on us to now be runnable
            if (r.ordered || r.prioritized) {
                for (int i = 0; i < r.receivers.size(); i++) {
                    if (!isDeliveryStateTerminal(getDeliveryState(r, i)) || (i == index)) {
                        final Object otherReceiver = r.receivers.get(i);
                        final BroadcastProcessQueue otherQueue = getProcessQueue(
                                getReceiverProcessName(otherReceiver),
                                getReceiverUid(otherReceiver));
                        if (otherQueue != null) {
                            otherQueue.invalidateRunnableAt();
                            updateRunnableList(otherQueue);
                        }
                    }
                }
                enqueueUpdateRunningList();
            }
        }
    }
@@ -1148,7 +1181,7 @@ class BroadcastQueueModernImpl extends BroadcastQueue {
        FrameworkStatsLog.write(BROADCAST_DELIVERY_EVENT_REPORTED, uid, senderUid, actionName,
                receiverType, type, dispatchDelay, receiveDelay, finishDelay);

        final boolean recordFinished = (r.finishedCount == r.receivers.size());
        final boolean recordFinished = (r.terminalCount == r.receivers.size());
        if (recordFinished) {
            mHistory.addBroadcastToHistoryLocked(r);

+29 −2
Original line number Diff line number Diff line
@@ -45,7 +45,6 @@ import android.os.IBinder;
import android.os.SystemClock;
import android.os.UserHandle;
import android.util.PrintWriterPrinter;
import android.util.Slog;
import android.util.SparseArray;
import android.util.TimeUtils;
import android.util.proto.ProtoOutputStream;
@@ -82,6 +81,7 @@ final class BroadcastRecord extends Binder {
    final boolean pushMessage; // originated from a push message?
    final boolean pushMessageOverQuota; // originated from a push message which was over quota?
    final boolean initialSticky; // initial broadcast from register to sticky?
    final boolean prioritized; // contains more than one priority tranche
    final int userId;       // user id this broadcast was for
    final String resolvedType; // the resolved data type
    final String[] requiredPermissions; // permissions the caller has required
@@ -115,7 +115,7 @@ final class BroadcastRecord extends Binder {
    int anrCount;           // has this broadcast record hit any ANRs?
    int manifestCount;      // number of manifest receivers dispatched.
    int manifestSkipCount;  // number of manifest receivers skipped.
    int finishedCount;      // number of receivers finished.
    int terminalCount;      // number of receivers in terminal state.
    BroadcastQueue queue;   // the outbound queue handling this broadcast

    // if set to true, app's process will be temporarily allowed to start activities from background
@@ -375,6 +375,7 @@ final class BroadcastRecord extends Binder {
        ordered = _serialized;
        sticky = _sticky;
        initialSticky = _initialSticky;
        prioritized = isPrioritized(receivers);
        userId = _userId;
        nextReceiver = 0;
        state = IDLE;
@@ -404,6 +405,7 @@ final class BroadcastRecord extends Binder {
        ordered = from.ordered;
        sticky = from.sticky;
        initialSticky = from.initialSticky;
        prioritized = from.prioritized;
        userId = from.userId;
        resolvedType = from.resolvedType;
        requiredPermissions = from.requiredPermissions;
@@ -646,6 +648,23 @@ final class BroadcastRecord extends Binder {
        return (newIntent != null) ? newIntent : intent;
    }

    /**
     * Return if given receivers list has more than one traunch of priorities.
     */
    private static boolean isPrioritized(@NonNull List<Object> receivers) {
        int firstPriority = 0;
        for (int i = 0; i < receivers.size(); i++) {
            final int thisPriority = getReceiverPriority(receivers.get(i));
            if (i == 0) {
                firstPriority = thisPriority;
            } else if (thisPriority != firstPriority) {
                return true;
            }
        }
        return false;
    }


    static int getReceiverUid(@NonNull Object receiver) {
        if (receiver instanceof BroadcastFilter) {
            return ((BroadcastFilter) receiver).owningUid;
@@ -670,6 +689,14 @@ final class BroadcastRecord extends Binder {
        }
    }

    static int getReceiverPriority(@NonNull Object receiver) {
        if (receiver instanceof BroadcastFilter) {
            return ((BroadcastFilter) receiver).getPriority();
        } else /* if (receiver instanceof ResolveInfo) */ {
            return ((ResolveInfo) receiver).priority;
        }
    }

    public BroadcastRecord maybeStripForHistory() {
        if (!intent.canStripForHistory()) {
            return this;
+35 −5
Original line number Diff line number Diff line
@@ -28,6 +28,7 @@ import static com.android.server.am.BroadcastQueueTest.makeManifestReceiver;

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotEquals;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import static org.mockito.Mockito.doReturn;
@@ -250,10 +251,11 @@ public class BroadcastQueueModernImplTest {
     */
    @Test
    public void testRunnableAt_Empty() {
        BroadcastProcessQueue queue = new BroadcastProcessQueue(mConstants,
        final BroadcastProcessQueue queue = new BroadcastProcessQueue(mConstants,
                PACKAGE_GREEN, getUidForPackage(PACKAGE_GREEN));
        assertFalse(queue.isRunnable());
        assertEquals(Long.MAX_VALUE, queue.getRunnableAt());
        assertEquals(ProcessList.SCHED_GROUP_UNDEFINED, queue.getPreferredSchedulingGroupLocked());
    }

    /**
@@ -262,18 +264,19 @@ public class BroadcastQueueModernImplTest {
     */
    @Test
    public void testRunnableAt_Normal() {
        BroadcastProcessQueue queue = new BroadcastProcessQueue(mConstants,
        final BroadcastProcessQueue queue = new BroadcastProcessQueue(mConstants,
                PACKAGE_GREEN, getUidForPackage(PACKAGE_GREEN));

        final Intent airplane = new Intent(Intent.ACTION_AIRPLANE_MODE_CHANGED);
        final BroadcastRecord airplaneRecord = makeBroadcastRecord(airplane);
        queue.enqueueBroadcast(airplaneRecord, 0);
        queue.enqueueBroadcast(airplaneRecord, 0, 0);

        queue.setProcessCached(false);
        final long notCachedRunnableAt = queue.getRunnableAt();
        queue.setProcessCached(true);
        final long cachedRunnableAt = queue.getRunnableAt();
        assertTrue(cachedRunnableAt > notCachedRunnableAt);
        assertEquals(ProcessList.SCHED_GROUP_BACKGROUND, queue.getPreferredSchedulingGroupLocked());
    }

    /**
@@ -282,21 +285,48 @@ public class BroadcastQueueModernImplTest {
     */
    @Test
    public void testRunnableAt_Foreground() {
        BroadcastProcessQueue queue = new BroadcastProcessQueue(mConstants,
        final BroadcastProcessQueue queue = new BroadcastProcessQueue(mConstants,
                PACKAGE_GREEN, getUidForPackage(PACKAGE_GREEN));

        final Intent airplane = new Intent(Intent.ACTION_AIRPLANE_MODE_CHANGED);
        airplane.addFlags(Intent.FLAG_RECEIVER_FOREGROUND);
        final BroadcastRecord airplaneRecord = makeBroadcastRecord(airplane);
        queue.enqueueBroadcast(airplaneRecord, 0);
        queue.enqueueBroadcast(airplaneRecord, 0, 0);

        queue.setProcessCached(false);
        assertTrue(queue.isRunnable());
        assertEquals(airplaneRecord.enqueueTime, queue.getRunnableAt());
        assertEquals(ProcessList.SCHED_GROUP_DEFAULT, queue.getPreferredSchedulingGroupLocked());

        queue.setProcessCached(true);
        assertTrue(queue.isRunnable());
        assertEquals(airplaneRecord.enqueueTime, queue.getRunnableAt());
        assertEquals(ProcessList.SCHED_GROUP_DEFAULT, queue.getPreferredSchedulingGroupLocked());
    }

    /**
     * Queue with ordered broadcast is runnable only once we've made enough
     * progress on earlier blocking items.
     */
    @Test
    public void testRunnableAt_Ordered() {
        final BroadcastProcessQueue queue = new BroadcastProcessQueue(mConstants,
                PACKAGE_GREEN, getUidForPackage(PACKAGE_GREEN));

        final Intent airplane = new Intent(Intent.ACTION_AIRPLANE_MODE_CHANGED);
        final BroadcastRecord airplaneRecord = makeBroadcastRecord(airplane, null,
                List.of(makeManifestReceiver(PACKAGE_GREEN, CLASS_GREEN),
                        makeManifestReceiver(PACKAGE_GREEN, CLASS_GREEN)), true);
        queue.enqueueBroadcast(airplaneRecord, 1, 1);

        assertFalse(queue.isRunnable());
        assertEquals(BroadcastProcessQueue.REASON_BLOCKED, queue.getRunnableAtReason());

        // Bumping past barrier makes us now runnable
        airplaneRecord.terminalCount++;
        queue.invalidateRunnableAt();
        assertTrue(queue.isRunnable());
        assertNotEquals(BroadcastProcessQueue.REASON_BLOCKED, queue.getRunnableAtReason());
    }

    /**
+75 −6

File changed.

Preview size limit exceeded, changes collapsed.