Donate to e Foundation | Murena handsets with /e/OS | Own a part of Murena! Learn more

Commit c401df9f authored by Jeff Sharkey's avatar Jeff Sharkey
Browse files

BroadcastQueue: add support for REPLACE_PENDING.

The "default" stack has support for FLAG_RECEIVER_REPLACE_PENDING,
which swaps any pending broadcasts with a new replacement.  If there
are no pending matches, then the broadcast is sent as normal.

This change implements this flag feature for the "modern" stack, which
is a bit more tricky due to the per-process queues.  We first mark
any per-process receivers as being "skipped", and then do any
replacement when enqueueing it later.  This is because the updated
broadcast may have dropped a previous receiver target, which we still
need to mark as "skipped" to avoid accidental delivery to that
now-stale target.

Update "default" stack to support postSyncBarrier(); the handler
used there isn't hosting UI, so it has no other sync barriers.

Bug: 245771249
Test: atest FrameworksMockingServicesTests:BroadcastQueueTest
Test: atest FrameworksMockingServicesTests:BroadcastQueueModernImplTest
Change-Id: I1c40fc62cabd7388519627d29802c3f710235009
parent 297621c0
Loading
Loading
Loading
Loading
+79 −39
Original line number Diff line number Diff line
@@ -18,6 +18,7 @@ package com.android.server.am;

import static com.android.server.am.BroadcastRecord.deliveryStateToString;
import static com.android.server.am.BroadcastRecord.isDeliveryStateTerminal;
import static com.android.server.am.BroadcastRecord.isReceiverEquals;

import android.annotation.IntDef;
import android.annotation.NonNull;
@@ -142,38 +143,47 @@ class BroadcastProcessQueue {
     * future point in time. The target receiver is indicated by the given index
     * into {@link BroadcastRecord#receivers}.
     * <p>
     * If the broadcast is marked as {@link BroadcastRecord#isReplacePending()},
     * then this call will replace any pending dispatch; otherwise it will
     * enqueue as a normal broadcast.
     * <p>
     * When defined, this receiver is considered "blocked" until at least the
     * given count of other receivers have reached a terminal state; typically
     * used for ordered broadcasts and priority traunches.
     */
    public void enqueueBroadcast(@NonNull BroadcastRecord record, int recordIndex,
    public void enqueueOrReplaceBroadcast(@NonNull BroadcastRecord record, int recordIndex,
            int blockedUntilTerminalCount) {
        // Detect situations where the incoming broadcast should cause us to
        // recalculate when we'll be runnable
        if (mPending.isEmpty()) {
            invalidateRunnableAt();
        }
        if (record.isForeground()) {
            mCountForeground++;
            invalidateRunnableAt();
        }
        if (record.ordered) {
            mCountOrdered++;
            invalidateRunnableAt();
        // If caller wants to replace, walk backwards looking for any matches
        if (record.isReplacePending()) {
            final Iterator<SomeArgs> it = mPending.descendingIterator();
            final Object receiver = record.receivers.get(recordIndex);
            while (it.hasNext()) {
                final SomeArgs args = it.next();
                final BroadcastRecord testRecord = (BroadcastRecord) args.arg1;
                final Object testReceiver = testRecord.receivers.get(args.argi1);
                if ((record.callingUid == testRecord.callingUid)
                        && (record.userId == testRecord.userId)
                        && record.intent.filterEquals(testRecord.intent)
                        && isReceiverEquals(receiver, testReceiver)) {
                    // Exact match found; perform in-place swap
                    args.arg1 = record;
                    args.argi1 = recordIndex;
                    args.argi2 = blockedUntilTerminalCount;
                    onBroadcastDequeued(testRecord);
                    onBroadcastEnqueued(record);
                    return;
                }
        if (record.alarm) {
            mCountAlarm++;
            invalidateRunnableAt();
            }
        if (record.prioritized) {
            mCountPrioritized++;
            invalidateRunnableAt();
        }

        // Caller isn't interested in replacing, or we didn't find any pending
        // item to replace above, so enqueue as a new broadcast
        SomeArgs args = SomeArgs.obtain();
        args.arg1 = record;
        args.argi1 = recordIndex;
        args.argi2 = blockedUntilTerminalCount;
        mPending.addLast(args);
        onBroadcastEnqueued(record);
    }

    /**
@@ -195,14 +205,15 @@ class BroadcastProcessQueue {
    }

    /**
     * Remove any broadcasts matching the given predicate.
     * Invoke given consumer for any broadcasts matching given predicate. If
     * requested, matching broadcasts will also be removed from this queue.
     * <p>
     * Predicates that choose to remove a broadcast <em>must</em> finish
     * delivery of the matched broadcast, to ensure that situations like ordered
     * broadcasts are handled consistently.
     */
    public boolean removeMatchingBroadcasts(@NonNull BroadcastPredicate predicate,
            @NonNull BroadcastConsumer consumer) {
    public boolean forEachMatchingBroadcast(@NonNull BroadcastPredicate predicate,
            @NonNull BroadcastConsumer consumer, boolean andRemove) {
        boolean didSomething = false;
        final Iterator<SomeArgs> it = mPending.iterator();
        while (it.hasNext()) {
@@ -211,8 +222,11 @@ class BroadcastProcessQueue {
            final int index = args.argi1;
            if (predicate.test(record, index)) {
                consumer.accept(record, index);
                if (andRemove) {
                    args.recycle();
                    it.remove();
                    onBroadcastDequeued(record);
                }
                didSomething = true;
            }
        }
@@ -282,19 +296,7 @@ class BroadcastProcessQueue {
        mActiveCountSinceIdle++;
        mActiveViaColdStart = false;
        next.recycle();
        if (mActive.isForeground()) {
            mCountForeground--;
        }
        if (mActive.ordered) {
            mCountOrdered--;
        }
        if (mActive.alarm) {
            mCountAlarm--;
        }
        if (mActive.prioritized) {
            mCountPrioritized--;
        }
        invalidateRunnableAt();
        onBroadcastDequeued(mActive);
    }

    /**
@@ -308,6 +310,44 @@ class BroadcastProcessQueue {
        invalidateRunnableAt();
    }

    /**
     * Update summary statistics when the given record has been enqueued.
     */
    private void onBroadcastEnqueued(@NonNull BroadcastRecord record) {
        if (record.isForeground()) {
            mCountForeground++;
        }
        if (record.ordered) {
            mCountOrdered++;
        }
        if (record.alarm) {
            mCountAlarm++;
        }
        if (record.prioritized) {
            mCountPrioritized++;
        }
        invalidateRunnableAt();
    }

    /**
     * Update summary statistics when the given record has been dequeued.
     */
    private void onBroadcastDequeued(@NonNull BroadcastRecord record) {
        if (record.isForeground()) {
            mCountForeground--;
        }
        if (record.ordered) {
            mCountOrdered--;
        }
        if (record.alarm) {
            mCountAlarm--;
        }
        if (record.prioritized) {
            mCountPrioritized--;
        }
        invalidateRunnableAt();
    }

    public void traceProcessStartingBegin() {
        Trace.asyncTraceForTrackBegin(Trace.TRACE_TAG_ACTIVITY_MANAGER,
                traceTrackName, toShortString() + " starting", hashCode());
+1 −1
Original line number Diff line number Diff line
@@ -163,7 +163,7 @@ public class BroadcastQueueImpl extends BroadcastQueue {

    private final class BroadcastHandler extends Handler {
        public BroadcastHandler(Looper looper) {
            super(looper, null, true);
            super(looper, null);
        }

        @Override
+38 −15
Original line number Diff line number Diff line
@@ -40,6 +40,7 @@ import static com.android.server.am.OomAdjuster.OOM_ADJ_REASON_START_RECEIVER;

import android.annotation.NonNull;
import android.annotation.Nullable;
import android.app.Activity;
import android.app.ActivityManager;
import android.app.IApplicationThread;
import android.app.RemoteServiceException.CannotDeliverBroadcastException;
@@ -463,9 +464,9 @@ class BroadcastQueueModernImpl extends BroadcastQueue {

            // Skip any pending registered receivers, since the old process
            // would never be around to receive them
            boolean didSomething = queue.removeMatchingBroadcasts((r, i) -> {
            boolean didSomething = queue.forEachMatchingBroadcast((r, i) -> {
                return (r.receivers.get(i) instanceof BroadcastFilter);
            }, mBroadcastConsumerSkip);
            }, mBroadcastConsumerSkip, true);
            if (didSomething || queue.isEmpty()) {
                updateRunnableList(queue);
                enqueueUpdateRunningList();
@@ -490,11 +491,23 @@ class BroadcastQueueModernImpl extends BroadcastQueue {
                ? r.options.getRemoveMatchingFilter() : null;
        if (removeMatchingFilter != null) {
            final Predicate<Intent> removeMatching = removeMatchingFilter.asPredicate();
            skipMatchingBroadcasts(QUEUE_PREDICATE_ANY, (testRecord, testReceiver) -> {
                // We only allow caller to clear broadcasts they enqueued
                return (testRecord.callingUid == r.callingUid)
            forEachMatchingBroadcast(QUEUE_PREDICATE_ANY, (testRecord, testIndex) -> {
                // We only allow caller to remove broadcasts they enqueued
                return (r.callingUid == testRecord.callingUid)
                        && (r.userId == testRecord.userId)
                        && removeMatching.test(testRecord.intent);
            });
            }, mBroadcastConsumerSkipAndCanceled, true);
        }

        if (r.isReplacePending()) {
            // Leave the skipped broadcasts intact in queue, so that we can
            // replace them at their current position during enqueue below
            forEachMatchingBroadcast(QUEUE_PREDICATE_ANY, (testRecord, testIndex) -> {
                // We only allow caller to replace broadcasts they enqueued
                return (r.callingUid == testRecord.callingUid)
                        && (r.userId == testRecord.userId)
                        && r.intent.filterEquals(testRecord.intent);
            }, mBroadcastConsumerSkipAndCanceled, false);
        }

        r.enqueueTime = SystemClock.uptimeMillis();
@@ -530,7 +543,7 @@ class BroadcastQueueModernImpl extends BroadcastQueue {
                blockedUntilTerminalCount = 0;
            }

            queue.enqueueBroadcast(r, i, blockedUntilTerminalCount);
            queue.enqueueOrReplaceBroadcast(r, i, blockedUntilTerminalCount);
            updateRunnableList(queue);
            enqueueUpdateRunningList();
        }
@@ -912,7 +925,8 @@ class BroadcastQueueModernImpl extends BroadcastQueue {
            };
            broadcastPredicate = BROADCAST_PREDICATE_ANY;
        }
        return skipMatchingBroadcasts(queuePredicate, broadcastPredicate);
        return forEachMatchingBroadcast(queuePredicate, broadcastPredicate,
                mBroadcastConsumerSkip, true);
    }

    private static final Predicate<BroadcastProcessQueue> QUEUE_PREDICATE_ANY =
@@ -928,19 +942,28 @@ class BroadcastQueueModernImpl extends BroadcastQueue {
        setDeliveryState(null, null, r, i, r.receivers.get(i), BroadcastRecord.DELIVERY_SKIPPED);
    };

    private boolean skipMatchingBroadcasts(
    /**
     * Typical consumer that will both skip the given broadcast and mark it as
     * cancelled, usually as a result of it matching a predicate.
     */
    private final BroadcastConsumer mBroadcastConsumerSkipAndCanceled = (r, i) -> {
        setDeliveryState(null, null, r, i, r.receivers.get(i), BroadcastRecord.DELIVERY_SKIPPED);
        r.resultCode = Activity.RESULT_CANCELED;
        r.resultData = null;
        r.resultExtras = null;
    };

    private boolean forEachMatchingBroadcast(
            @NonNull Predicate<BroadcastProcessQueue> queuePredicate,
            @NonNull BroadcastPredicate broadcastPredicate) {
        // Note that we carefully preserve any "skipped" broadcasts in their
        // queues so that we follow our normal flow for "finishing" a broadcast,
        // which is where we handle things like ordered broadcasts.
            @NonNull BroadcastPredicate broadcastPredicate,
            @NonNull BroadcastConsumer broadcastConsumer, boolean andRemove) {
        boolean didSomething = false;
        for (int i = 0; i < mProcessQueues.size(); i++) {
            BroadcastProcessQueue leaf = mProcessQueues.valueAt(i);
            while (leaf != null) {
                if (queuePredicate.test(leaf)) {
                    if (leaf.removeMatchingBroadcasts(broadcastPredicate,
                            mBroadcastConsumerSkip)) {
                    if (leaf.forEachMatchingBroadcast(broadcastPredicate,
                            broadcastConsumer, andRemove)) {
                        updateRunnableList(leaf);
                        didSomething = true;
                    }
+14 −0
Original line number Diff line number Diff line
@@ -59,6 +59,7 @@ import java.util.ArrayList;
import java.util.Arrays;
import java.util.Date;
import java.util.List;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.BiFunction;
@@ -697,6 +698,19 @@ final class BroadcastRecord extends Binder {
        }
    }

    static boolean isReceiverEquals(@NonNull Object a, @NonNull Object b) {
        if (a == b) {
            return true;
        } else if (a instanceof ResolveInfo && b instanceof ResolveInfo) {
            final ResolveInfo infoA = (ResolveInfo) a;
            final ResolveInfo infoB = (ResolveInfo) b;
            return Objects.equals(infoA.activityInfo.packageName, infoB.activityInfo.packageName)
                    && Objects.equals(infoA.activityInfo.name, infoB.activityInfo.name);
        } else {
            return false;
        }
    }

    public BroadcastRecord maybeStripForHistory() {
        if (!intent.canStripForHistory()) {
            return this;
+3 −3
Original line number Diff line number Diff line
@@ -269,7 +269,7 @@ public class BroadcastQueueModernImplTest {

        final Intent airplane = new Intent(Intent.ACTION_AIRPLANE_MODE_CHANGED);
        final BroadcastRecord airplaneRecord = makeBroadcastRecord(airplane);
        queue.enqueueBroadcast(airplaneRecord, 0, 0);
        queue.enqueueOrReplaceBroadcast(airplaneRecord, 0, 0);

        queue.setProcessCached(false);
        final long notCachedRunnableAt = queue.getRunnableAt();
@@ -291,7 +291,7 @@ public class BroadcastQueueModernImplTest {
        final Intent airplane = new Intent(Intent.ACTION_AIRPLANE_MODE_CHANGED);
        airplane.addFlags(Intent.FLAG_RECEIVER_FOREGROUND);
        final BroadcastRecord airplaneRecord = makeBroadcastRecord(airplane);
        queue.enqueueBroadcast(airplaneRecord, 0, 0);
        queue.enqueueOrReplaceBroadcast(airplaneRecord, 0, 0);

        queue.setProcessCached(false);
        assertTrue(queue.isRunnable());
@@ -317,7 +317,7 @@ public class BroadcastQueueModernImplTest {
        final BroadcastRecord airplaneRecord = makeBroadcastRecord(airplane, null,
                List.of(makeManifestReceiver(PACKAGE_GREEN, CLASS_GREEN),
                        makeManifestReceiver(PACKAGE_GREEN, CLASS_GREEN)), true);
        queue.enqueueBroadcast(airplaneRecord, 1, 1);
        queue.enqueueOrReplaceBroadcast(airplaneRecord, 1, 1);

        assertFalse(queue.isRunnable());
        assertEquals(BroadcastProcessQueue.REASON_BLOCKED, queue.getRunnableAtReason());
Loading