Donate to e Foundation | Murena handsets with /e/OS | Own a part of Murena! Learn more

Commit 0b35f9df authored by Christopher Tate's avatar Christopher Tate
Browse files

Prioritize some broadcasts ahead of strict FIFO

User-interactive event flow and broadcast priority classes both suffer
from priority inversion issues with strict FIFO dispatch policy:
existing queued deliveries might require significant time to complete
dispatch, with the resulting delivery latency of the interactive or
foreground-priority messages leading to poor user experience.  To
address this issue, we introduce urgency-base queueing policy, with
"urgent" broadcasts placed ahead of ordinary ones in the recipients'
pending queues.  A broadcast is currently considered urgent when:

* the sender has marked it FLAG_RECEIVER_FOREGROUND, or
* the sender has marked it as "interactive", or
* the broadcast was issued by an alarm triggering

AND

* the broadcast is only going to a single receiver

That final restriction prevents certain kinds of interference with
ordered broadcast delivery and completion semantics.

"interactive" is a new BroadcastOption applied when the broadcast
issuance is in response to a direct user interaction, i.e. when
timeliness of delivery is directly perceived as a responsiveness issue
by the user.  The canonical example of this is notification
interactions, though perhaps the concept might usefully be applied to
scenarios such as high-priority push message delivery.  For now its use
is restricted to system-internal callers.

Finite capacity for dispatch parallelism can currently cause priority
inversions even with the new queueing policy if all available running
slots are occupied with background-priority deliveries.  Addressing
this will happen in a follow-on CL.

Bug: 251902289
Test: atest FrameworksMockingServicesTests:BroadcastQueueModernImplTest
Test: atest FrameworksMockingServicesTests:BroadcastQueueTest
Change-Id: I47813c2c6ca43559f39d701f104fa62527c7bc21
parent 9a945ac1
Loading
Loading
Loading
Loading
+33 −0
Original line number Diff line number Diff line
@@ -57,6 +57,7 @@ public class BroadcastOptions extends ComponentOptions {
    private long mRequireCompatChangeId = CHANGE_INVALID;
    private boolean mRequireCompatChangeEnabled = true;
    private boolean mIsAlarmBroadcast = false;
    private boolean mIsInteractiveBroadcast = false;
    private long mIdForResponseEvent;
    private @Nullable IntentFilter mRemoveMatchingFilter;

@@ -160,6 +161,13 @@ public class BroadcastOptions extends ComponentOptions {
    public static final String KEY_ALARM_BROADCAST =
            "android:broadcast.is_alarm";

    /**
     * Corresponds to {@link #setInteractiveBroadcast(boolean)}
     * @hide
     */
    public static final String KEY_INTERACTIVE_BROADCAST =
            "android:broadcast.is_interactive";

    /**
     * @hide
     * @deprecated Use {@link android.os.PowerExemptionManager#
@@ -234,6 +242,7 @@ public class BroadcastOptions extends ComponentOptions {
        mRequireCompatChangeEnabled = opts.getBoolean(KEY_REQUIRE_COMPAT_CHANGE_ENABLED, true);
        mIdForResponseEvent = opts.getLong(KEY_ID_FOR_RESPONSE_EVENT);
        mIsAlarmBroadcast = opts.getBoolean(KEY_ALARM_BROADCAST, false);
        mIsInteractiveBroadcast = opts.getBoolean(KEY_INTERACTIVE_BROADCAST, false);
        mRemoveMatchingFilter = opts.getParcelable(KEY_REMOVE_MATCHING_FILTER,
                IntentFilter.class);
    }
@@ -548,6 +557,27 @@ public class BroadcastOptions extends ComponentOptions {
        return mIsAlarmBroadcast;
    }

    /**
     * When set, this broadcast will be understood as having originated from
     * some direct interaction by the user such as a notification tap or button
     * press.  Only the OS itself may use this option.
     * @hide
     * @param broadcastIsInteractive
     * @see #isInteractiveBroadcast()
     */
    public void setInteractiveBroadcast(boolean broadcastIsInteractive) {
        mIsInteractiveBroadcast = broadcastIsInteractive;
    }

    /**
     * Did this broadcast originate with a direct user interaction?
     * @return true if this broadcast is the result of an interaction, false otherwise
     * @hide
     */
    public boolean isInteractiveBroadcast() {
        return mIsInteractiveBroadcast;
    }

    /**
     * Did this broadcast originate from a push message from the server?
     *
@@ -658,6 +688,9 @@ public class BroadcastOptions extends ComponentOptions {
        if (mIsAlarmBroadcast) {
            b.putBoolean(KEY_ALARM_BROADCAST, true);
        }
        if (mIsInteractiveBroadcast) {
            b.putBoolean(KEY_INTERACTIVE_BROADCAST, true);
        }
        if (mMinManifestReceiverApiLevel != 0) {
            b.putInt(KEY_MIN_MANIFEST_RECEIVER_API_LEVEL, mMinManifestReceiverApiLevel);
        }
+4 −3
Original line number Diff line number Diff line
@@ -14683,13 +14683,14 @@ public class ActivityManagerService extends IActivityManager.Stub
            // Non-system callers can't declare that a broadcast is alarm-related.
            // The PendingIntent invocation case is handled in PendingIntentRecord.
            if (bOptions != null && callingUid != SYSTEM_UID) {
                if (bOptions.containsKey(BroadcastOptions.KEY_ALARM_BROADCAST)) {
                if (bOptions.containsKey(BroadcastOptions.KEY_ALARM_BROADCAST)
                        || bOptions.containsKey(BroadcastOptions.KEY_INTERACTIVE_BROADCAST)) {
                    if (DEBUG_BROADCAST) {
                        Slog.w(TAG, "Non-system caller " + callingUid
                                + " may not flag broadcast as alarm-related");
                                + " may not flag broadcast as alarm or interactive");
                    }
                    throw new SecurityException(
                            "Non-system callers may not flag broadcasts as alarm-related");
                            "Non-system callers may not flag broadcasts as alarm or interactive");
                }
            }
+112 −36
Original line number Diff line number Diff line
@@ -102,6 +102,13 @@ class BroadcastProcessQueue {
     */
    private final ArrayDeque<SomeArgs> mPending = new ArrayDeque<>();

    /**
     * Ordered collection of "urgent" broadcasts that are waiting to be
     * dispatched to this process, in the same representation as
     * {@link #mPending}.
     */
    private final ArrayDeque<SomeArgs> mPendingUrgent = new ArrayDeque<>();

    /**
     * Broadcast actively being dispatched to this process.
     */
@@ -164,9 +171,43 @@ class BroadcastProcessQueue {
     */
    public void enqueueOrReplaceBroadcast(@NonNull BroadcastRecord record, int recordIndex,
            int blockedUntilTerminalCount) {
        // If caller wants to replace, walk backwards looking for any matches
        if (record.isReplacePending()) {
            final Iterator<SomeArgs> it = mPending.descendingIterator();
            boolean didReplace = replaceBroadcastInQueue(mPending,
                    record, recordIndex, blockedUntilTerminalCount)
                    || replaceBroadcastInQueue(mPendingUrgent,
                    record, recordIndex, blockedUntilTerminalCount);
            if (didReplace) {
                return;
            }
        }

        // Caller isn't interested in replacing, or we didn't find any pending
        // item to replace above, so enqueue as a new broadcast
        SomeArgs newBroadcastArgs = SomeArgs.obtain();
        newBroadcastArgs.arg1 = record;
        newBroadcastArgs.argi1 = recordIndex;
        newBroadcastArgs.argi2 = blockedUntilTerminalCount;

        // Cross-broadcast prioritization policy:  some broadcasts might warrant being
        // issued ahead of others that are already pending, for example if this new
        // broadcast is in a different delivery class or is tied to a direct user interaction
        // with implicit responsiveness expectations.
        final ArrayDeque<SomeArgs> queue = record.isUrgent() ? mPendingUrgent : mPending;
        queue.addLast(newBroadcastArgs);
        onBroadcastEnqueued(record);
    }

    /**
     * Searches from newest to oldest, and at the first matching pending broadcast
     * it finds, replaces it in-place and returns -- does not attempt to handle
     * "duplicate" broadcasts in the queue.
     * <p>
     * @return {@code true} if it found and replaced an existing record in the queue;
     * {@code false} otherwise.
     */
    private boolean replaceBroadcastInQueue(@NonNull ArrayDeque<SomeArgs> queue,
            @NonNull BroadcastRecord record, int recordIndex,  int blockedUntilTerminalCount) {
        final Iterator<SomeArgs> it = queue.descendingIterator();
        final Object receiver = record.receivers.get(recordIndex);
        while (it.hasNext()) {
            final SomeArgs args = it.next();
@@ -182,19 +223,10 @@ class BroadcastProcessQueue {
                args.argi2 = blockedUntilTerminalCount;
                onBroadcastDequeued(testRecord);
                onBroadcastEnqueued(record);
                    return;
                return true;
            }
        }
        }

        // Caller isn't interested in replacing, or we didn't find any pending
        // item to replace above, so enqueue as a new broadcast
        SomeArgs args = SomeArgs.obtain();
        args.arg1 = record;
        args.argi1 = recordIndex;
        args.argi2 = blockedUntilTerminalCount;
        mPending.addLast(args);
        onBroadcastEnqueued(record);
        return false;
    }

    /**
@@ -225,8 +257,18 @@ class BroadcastProcessQueue {
     */
    public boolean forEachMatchingBroadcast(@NonNull BroadcastPredicate predicate,
            @NonNull BroadcastConsumer consumer, boolean andRemove) {
        boolean didSomething = forEachMatchingBroadcastInQueue(mPending,
                predicate, consumer, andRemove);
        didSomething |= forEachMatchingBroadcastInQueue(mPendingUrgent,
                predicate, consumer, andRemove);
        return didSomething;
    }

    private boolean forEachMatchingBroadcastInQueue(@NonNull ArrayDeque<SomeArgs> queue,
            @NonNull BroadcastPredicate predicate, @NonNull BroadcastConsumer consumer,
            boolean andRemove) {
        boolean didSomething = false;
        final Iterator<SomeArgs> it = mPending.iterator();
        final Iterator<SomeArgs> it = queue.iterator();
        while (it.hasNext()) {
            final SomeArgs args = it.next();
            final BroadcastRecord record = (BroadcastRecord) args.arg1;
@@ -301,7 +343,7 @@ class BroadcastProcessQueue {
     */
    public void makeActiveNextPending() {
        // TODO: what if the next broadcast isn't runnable yet?
        final SomeArgs next = mPending.removeFirst();
        final SomeArgs next = removeNextBroadcast();
        mActive = (BroadcastRecord) next.arg1;
        mActiveIndex = next.argi1;
        mActiveCountSinceIdle++;
@@ -403,13 +445,45 @@ class BroadcastProcessQueue {
    }

    public boolean isEmpty() {
        return mPending.isEmpty();
        return mPending.isEmpty() && mPendingUrgent.isEmpty();
    }

    public boolean isActive() {
        return mActive != null;
    }

    /**
     * Will thrown an exception if there are no pending broadcasts; relies on
     * {@link #isEmpty()} being false.
     */
    SomeArgs removeNextBroadcast() {
        ArrayDeque<SomeArgs> queue = queueForNextBroadcast();
        return queue.removeFirst();
    }

    @Nullable ArrayDeque<SomeArgs> queueForNextBroadcast() {
        if (!mPendingUrgent.isEmpty()) {
            return mPendingUrgent;
        } else if (!mPending.isEmpty()) {
            return mPending;
        }
        return null;
    }

    /**
     * Returns null if there are no pending broadcasts
     */
    @Nullable SomeArgs peekNextBroadcast() {
        ArrayDeque<SomeArgs> queue = queueForNextBroadcast();
        return (queue != null) ? queue.peekFirst() : null;
    }

    @VisibleForTesting
    @Nullable BroadcastRecord peekNextBroadcastRecord() {
        ArrayDeque<SomeArgs> queue = queueForNextBroadcast();
        return (queue != null) ? (BroadcastRecord) queue.peekFirst().arg1 : null;
    }

    /**
     * Quickly determine if this queue has broadcasts that are still waiting to
     * be delivered at some point in the future.
@@ -427,11 +501,13 @@ class BroadcastProcessQueue {
            return mActive.enqueueTime > barrierTime;
        }
        final SomeArgs next = mPending.peekFirst();
        if (next != null) {
            return ((BroadcastRecord) next.arg1).enqueueTime > barrierTime;
        }
        // Nothing running or runnable means we're past the barrier
        return true;
        final SomeArgs nextUrgent = mPendingUrgent.peekFirst();
        // Empty queue is past any barrier
        final boolean nextLater = next == null
                || ((BroadcastRecord) next.arg1).enqueueTime > barrierTime;
        final boolean nextUrgentLater = nextUrgent == null
                || ((BroadcastRecord) nextUrgent.arg1).enqueueTime > barrierTime;
        return nextLater && nextUrgentLater;
    }

    public boolean isRunnable() {
@@ -509,7 +585,7 @@ class BroadcastProcessQueue {
     * Update {@link #getRunnableAt()} if it's currently invalidated.
     */
    private void updateRunnableAt() {
        final SomeArgs next = mPending.peekFirst();
        final SomeArgs next = peekNextBroadcast();
        if (next != null) {
            final BroadcastRecord r = (BroadcastRecord) next.arg1;
            final int index = next.argi1;
@@ -527,7 +603,7 @@ class BroadcastProcessQueue {

            // If we have too many broadcasts pending, bypass any delays that
            // might have been applied above to aid draining
            if (mPending.size() >= constants.MAX_PENDING_BROADCASTS) {
            if (mPending.size() + mPendingUrgent.size() >= constants.MAX_PENDING_BROADCASTS) {
                mRunnableAt = runnableAt;
                mRunnableAtReason = REASON_MAX_PENDING;
                return;
@@ -564,8 +640,8 @@ class BroadcastProcessQueue {
     */
    public void checkHealthLocked() {
        if (mRunnableAtReason == REASON_BLOCKED) {
            final SomeArgs next = mPending.peekFirst();
            Objects.requireNonNull(next, "peekFirst");
            final SomeArgs next = peekNextBroadcast();
            Objects.requireNonNull(next, "peekNextBroadcast");

            // If blocked more than 10 minutes, we're likely wedged
            final BroadcastRecord r = (BroadcastRecord) next.arg1;
+15 −0
Original line number Diff line number Diff line
@@ -83,6 +83,7 @@ final class BroadcastRecord extends Binder {
    final boolean alarm;    // originated from an alarm triggering?
    final boolean pushMessage; // originated from a push message?
    final boolean pushMessageOverQuota; // originated from a push message which was over quota?
    final boolean interactive; // originated from user interaction?
    final boolean initialSticky; // initial broadcast from register to sticky?
    final boolean prioritized; // contains more than one priority tranche
    final int userId;       // user id this broadcast was for
@@ -389,6 +390,7 @@ final class BroadcastRecord extends Binder {
        alarm = options != null && options.isAlarmBroadcast();
        pushMessage = options != null && options.isPushMessagingBroadcast();
        pushMessageOverQuota = options != null && options.isPushMessagingOverQuotaBroadcast();
        interactive = options != null && options.isInteractiveBroadcast();
        this.filterExtrasForReceiver = filterExtrasForReceiver;
    }

@@ -446,6 +448,7 @@ final class BroadcastRecord extends Binder {
        alarm = from.alarm;
        pushMessage = from.pushMessage;
        pushMessageOverQuota = from.pushMessageOverQuota;
        interactive = from.interactive;
        filterExtrasForReceiver = from.filterExtrasForReceiver;
    }

@@ -607,6 +610,18 @@ final class BroadcastRecord extends Binder {
        return (intent.getFlags() & Intent.FLAG_RECEIVER_NO_ABORT) != 0;
    }

    /**
     * Core policy determination about this broadcast's delivery prioritization
     */
    boolean isUrgent() {
        // TODO: flags for controlling policy
        // TODO: migrate alarm-prioritization flag to BroadcastConstants
        return (isForeground()
                || interactive
                || alarm)
                && receivers.size() == 1;
    }

    @NonNull String getHostingRecordTriggerType() {
        if (alarm) {
            return HostingRecord.TRIGGER_TYPE_ALARM;
+4 −3
Original line number Diff line number Diff line
@@ -424,13 +424,14 @@ public final class PendingIntentRecord extends IIntentSender.Stub {
        // invocation side effects such as allowlisting.
        if (options != null && callingUid != Process.SYSTEM_UID
                && key.type == ActivityManager.INTENT_SENDER_BROADCAST) {
            if (options.containsKey(BroadcastOptions.KEY_ALARM_BROADCAST)) {
            if (options.containsKey(BroadcastOptions.KEY_ALARM_BROADCAST)
                    || options.containsKey(BroadcastOptions.KEY_INTERACTIVE_BROADCAST)) {
                if (DEBUG_BROADCAST_LIGHT) {
                    Slog.w(TAG, "Non-system caller " + callingUid
                            + " may not flag broadcast as alarm-related");
                            + " may not flag broadcast as alarm or interactive");
                }
                throw new SecurityException(
                        "Non-system callers may not flag broadcasts as alarm-related");
                        "Non-system callers may not flag broadcasts as alarm or interactive");
            }
        }

Loading