Donate to e Foundation | Murena handsets with /e/OS | Own a part of Murena! Learn more

Commit 684287d4 authored by Sudheer Shanka's avatar Sudheer Shanka
Browse files

Don't replace any broadcasts that already have been processed.

We do in-place replacement of broadcasts when enqueueing broadcasts
sent with FLAG_RECEIVER_REPLACE_PENDING and this could cause an
inversion in the broadcast queues if we replace a broadcast record
which is already delivered to some of the receivers. This is because
we will end up replacing the receivers which are still waiting and
the rest will be enqueued at the end of the broadcast queue which
changes the ordering in which these receivers should be receiving
the broadcast.

Bug: 261822892
Test: atest services/tests/mockingservicestests/src/com/android/server/am/BroadcastQueueTest.java
Test: atest services/tests/mockingservicestests/src/com/android/server/am/BroadcastQueueModernImplTest.java
Change-Id: I4c8b013a07e3b61ace32f4379cc644dbf5347371
parent 09183fb1
Loading
Loading
Loading
Loading
+36 −6
Original line number Diff line number Diff line
@@ -42,6 +42,7 @@ import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.util.ArrayDeque;
import java.util.Iterator;
import java.util.List;
import java.util.Objects;

/**
@@ -123,6 +124,12 @@ class BroadcastProcessQueue {
     */
    private final ArrayDeque<SomeArgs> mPendingOffload = new ArrayDeque<>(4);

    /**
     * List of all queues holding broadcasts that are waiting to be dispatched.
     */
    private final List<ArrayDeque<SomeArgs>> mPendingQueues = List.of(
            mPendingUrgent, mPending, mPendingOffload);

    /**
     * Broadcast actively being dispatched to this process.
     */
@@ -218,11 +225,11 @@ class BroadcastProcessQueue {
     * given count of other receivers have reached a terminal state; typically
     * used for ordered broadcasts and priority traunches.
     */
    public void enqueueOrReplaceBroadcast(@NonNull BroadcastRecord record, int recordIndex) {
    public void enqueueOrReplaceBroadcast(@NonNull BroadcastRecord record, int recordIndex,
            @NonNull BroadcastConsumer replacedBroadcastConsumer) {
        if (record.isReplacePending()) {
            boolean didReplace = replaceBroadcastInQueue(mPending, record, recordIndex)
                    || replaceBroadcastInQueue(mPendingUrgent, record, recordIndex)
                    || replaceBroadcastInQueue(mPendingOffload, record, recordIndex);
            final boolean didReplace = replaceBroadcast(record, recordIndex,
                    replacedBroadcastConsumer);
            if (didReplace) {
                return;
            }
@@ -242,6 +249,26 @@ class BroadcastProcessQueue {
        onBroadcastEnqueued(record, recordIndex);
    }

    /**
     * Searches from newest to oldest in the pending broadcast queues, and at the first matching
     * pending broadcast it finds, replaces it in-place and returns -- does not attempt to handle
     * "duplicate" broadcasts in the queue.
     * <p>
     * @return {@code true} if it found and replaced an existing record in the queue;
     * {@code false} otherwise.
     */
    private boolean replaceBroadcast(@NonNull BroadcastRecord record, int recordIndex,
            @NonNull BroadcastConsumer replacedBroadcastConsumer) {
        final int count = mPendingQueues.size();
        for (int i = 0; i < count; ++i) {
            final ArrayDeque<SomeArgs> queue = mPendingQueues.get(i);
            if (replaceBroadcastInQueue(queue, record, recordIndex, replacedBroadcastConsumer)) {
                return true;
            }
        }
        return false;
    }

    /**
     * Searches from newest to oldest, and at the first matching pending broadcast
     * it finds, replaces it in-place and returns -- does not attempt to handle
@@ -251,7 +278,8 @@ class BroadcastProcessQueue {
     * {@code false} otherwise.
     */
    private boolean replaceBroadcastInQueue(@NonNull ArrayDeque<SomeArgs> queue,
            @NonNull BroadcastRecord record, int recordIndex) {
            @NonNull BroadcastRecord record, int recordIndex,
            @NonNull BroadcastConsumer replacedBroadcastConsumer) {
        final Iterator<SomeArgs> it = queue.descendingIterator();
        final Object receiver = record.receivers.get(recordIndex);
        while (it.hasNext()) {
@@ -262,12 +290,14 @@ class BroadcastProcessQueue {
            if ((record.callingUid == testRecord.callingUid)
                    && (record.userId == testRecord.userId)
                    && record.intent.filterEquals(testRecord.intent)
                    && isReceiverEquals(receiver, testReceiver)) {
                    && isReceiverEquals(receiver, testReceiver)
                    && testRecord.allReceiversPending()) {
                // Exact match found; perform in-place swap
                args.arg1 = record;
                args.argi1 = recordIndex;
                onBroadcastDequeued(testRecord, testRecordIndex);
                onBroadcastEnqueued(record, recordIndex);
                replacedBroadcastConsumer.accept(testRecord, testRecordIndex);
                return true;
            }
        }
+20 −12
Original line number Diff line number Diff line
@@ -64,6 +64,7 @@ import android.os.RemoteException;
import android.os.SystemClock;
import android.os.UserHandle;
import android.text.format.DateUtils;
import android.util.ArraySet;
import android.util.IndentingPrintWriter;
import android.util.MathUtils;
import android.util.Pair;
@@ -629,30 +630,26 @@ class BroadcastQueueModernImpl extends BroadcastQueue {

        applyDeliveryGroupPolicy(r);

        if (r.isReplacePending()) {
            // Leave the skipped broadcasts intact in queue, so that we can
            // replace them at their current position during enqueue below
            forEachMatchingBroadcast(QUEUE_PREDICATE_ANY, (testRecord, testIndex) -> {
                // We only allow caller to replace broadcasts they enqueued
                return (r.callingUid == testRecord.callingUid)
                        && (r.userId == testRecord.userId)
                        && r.intent.filterEquals(testRecord.intent);
            }, mBroadcastConsumerSkipAndCanceled, false);
        }

        r.enqueueTime = SystemClock.uptimeMillis();
        r.enqueueRealTime = SystemClock.elapsedRealtime();
        r.enqueueClockTime = System.currentTimeMillis();

        final ArraySet<BroadcastRecord> replacedBroadcasts = new ArraySet<>();
        final BroadcastConsumer replacedBroadcastConsumer =
                (record, i) -> replacedBroadcasts.add(record);
        for (int i = 0; i < r.receivers.size(); i++) {
            final Object receiver = r.receivers.get(i);
            final BroadcastProcessQueue queue = getOrCreateProcessQueue(
                    getReceiverProcessName(receiver), getReceiverUid(receiver));
            queue.enqueueOrReplaceBroadcast(r, i);
            queue.enqueueOrReplaceBroadcast(r, i, replacedBroadcastConsumer);
            updateRunnableList(queue);
            enqueueUpdateRunningList();
        }

        // Skip any broadcasts that have been replaced by newer broadcasts with
        // FLAG_RECEIVER_REPLACE_PENDING.
        skipAndCancelReplacedBroadcasts(replacedBroadcasts);

        // If nothing to dispatch, send any pending result immediately
        if (r.receivers.isEmpty()) {
            scheduleResultTo(r);
@@ -662,6 +659,17 @@ class BroadcastQueueModernImpl extends BroadcastQueue {
        traceEnd(cookie);
    }

    private void skipAndCancelReplacedBroadcasts(ArraySet<BroadcastRecord> replacedBroadcasts) {
        for (int i = 0; i < replacedBroadcasts.size(); ++i) {
            final BroadcastRecord r = replacedBroadcasts.valueAt(i);
            r.resultCode = Activity.RESULT_CANCELED;
            r.resultData = null;
            r.resultExtras = null;
            scheduleResultTo(r);
            notifyFinishBroadcast(r);
        }
    }

    private void applyDeliveryGroupPolicy(@NonNull BroadcastRecord r) {
        if (mService.shouldIgnoreDeliveryGroupPolicy(r.intent.getAction())) {
            return;
+11 −0
Original line number Diff line number Diff line
@@ -907,6 +907,17 @@ final class BroadcastRecord extends Binder {
        return record.options == null ? null : record.options.getDeliveryGroupMatchingFilter();
    }

    /**
     * Returns {@code true} if all the receivers are still waiting to receive the broadcast.
     * Otherwise {@code false}.
     */
    boolean allReceiversPending() {
        // We could also count the number of receivers with deliver state DELIVERY_PENDING, but
        // checking how many receivers have finished (either skipped or cancelled) and whether or
        // not the dispatch has been started should be sufficient.
        return (terminalCount == 0 && dispatchTime <= 0);
    }

    @Override
    public String toString() {
        if (mCachedToString == null) {
+20 −14
Original line number Diff line number Diff line
@@ -197,7 +197,7 @@ public class BroadcastQueueModernImplTest {

    private void enqueueOrReplaceBroadcast(BroadcastProcessQueue queue,
            BroadcastRecord record, int recordIndex, long enqueueTime) {
        queue.enqueueOrReplaceBroadcast(record, recordIndex);
        queue.enqueueOrReplaceBroadcast(record, recordIndex, null /* replacedBroadcastConsumer */);
        record.enqueueTime = enqueueTime;
    }

@@ -327,7 +327,7 @@ public class BroadcastQueueModernImplTest {
        final Intent airplane = new Intent(Intent.ACTION_AIRPLANE_MODE_CHANGED);
        final BroadcastRecord airplaneRecord = makeBroadcastRecord(airplane,
                List.of(makeMockRegisteredReceiver()));
        queue.enqueueOrReplaceBroadcast(airplaneRecord, 0);
        queue.enqueueOrReplaceBroadcast(airplaneRecord, 0, null /* replacedBroadcastConsumer */);

        queue.setProcessCached(false);
        final long notCachedRunnableAt = queue.getRunnableAt();
@@ -349,12 +349,12 @@ public class BroadcastQueueModernImplTest {
        // enqueue a bg-priority broadcast then a fg-priority one
        final Intent timezone = new Intent(Intent.ACTION_TIMEZONE_CHANGED);
        final BroadcastRecord timezoneRecord = makeBroadcastRecord(timezone);
        queue.enqueueOrReplaceBroadcast(timezoneRecord, 0);
        queue.enqueueOrReplaceBroadcast(timezoneRecord, 0, null /* replacedBroadcastConsumer */);

        final Intent airplane = new Intent(Intent.ACTION_AIRPLANE_MODE_CHANGED);
        airplane.addFlags(Intent.FLAG_RECEIVER_FOREGROUND);
        final BroadcastRecord airplaneRecord = makeBroadcastRecord(airplane);
        queue.enqueueOrReplaceBroadcast(airplaneRecord, 0);
        queue.enqueueOrReplaceBroadcast(airplaneRecord, 0, null /* replacedBroadcastConsumer */);

        // verify that:
        // (a) the queue is immediately runnable by existence of a fg-priority broadcast
@@ -385,7 +385,7 @@ public class BroadcastQueueModernImplTest {
        final BroadcastRecord airplaneRecord = makeBroadcastRecord(airplane, null,
                List.of(withPriority(makeManifestReceiver(PACKAGE_GREEN, CLASS_GREEN), 10),
                        withPriority(makeManifestReceiver(PACKAGE_GREEN, CLASS_GREEN), 0)), true);
        queue.enqueueOrReplaceBroadcast(airplaneRecord, 1);
        queue.enqueueOrReplaceBroadcast(airplaneRecord, 1, null /* replacedBroadcastConsumer */);

        assertFalse(queue.isRunnable());
        assertEquals(BroadcastProcessQueue.REASON_BLOCKED, queue.getRunnableAtReason());
@@ -408,7 +408,7 @@ public class BroadcastQueueModernImplTest {
        final Intent airplane = new Intent(Intent.ACTION_AIRPLANE_MODE_CHANGED);
        final BroadcastRecord airplaneRecord = makeBroadcastRecord(airplane,
                List.of(makeMockRegisteredReceiver()));
        queue.enqueueOrReplaceBroadcast(airplaneRecord, 0);
        queue.enqueueOrReplaceBroadcast(airplaneRecord, 0, null /* replacedBroadcastConsumer */);

        mConstants.MAX_PENDING_BROADCASTS = 128;
        queue.invalidateRunnableAt();
@@ -434,11 +434,11 @@ public class BroadcastQueueModernImplTest {
                new Intent(Intent.ACTION_AIRPLANE_MODE_CHANGED),
                List.of(makeMockRegisteredReceiver()));

        queue.enqueueOrReplaceBroadcast(lazyRecord, 0);
        queue.enqueueOrReplaceBroadcast(lazyRecord, 0, null /* replacedBroadcastConsumer */);
        assertThat(queue.getRunnableAt()).isGreaterThan(lazyRecord.enqueueTime);
        assertThat(queue.getRunnableAtReason()).isNotEqualTo(testRunnableAtReason);

        queue.enqueueOrReplaceBroadcast(testRecord, 0);
        queue.enqueueOrReplaceBroadcast(testRecord, 0, null /* replacedBroadcastConsumer */);
        assertThat(queue.getRunnableAt()).isAtMost(testRecord.enqueueTime);
        assertThat(queue.getRunnableAtReason()).isEqualTo(testRunnableAtReason);
    }
@@ -507,20 +507,26 @@ public class BroadcastQueueModernImplTest {

        queue.enqueueOrReplaceBroadcast(
                makeBroadcastRecord(new Intent(Intent.ACTION_AIRPLANE_MODE_CHANGED)
                        .addFlags(Intent.FLAG_RECEIVER_OFFLOAD)), 0);
                        .addFlags(Intent.FLAG_RECEIVER_OFFLOAD)), 0,
                null /* replacedBroadcastConsumer */);
        queue.enqueueOrReplaceBroadcast(
                makeBroadcastRecord(new Intent(Intent.ACTION_TIMEZONE_CHANGED)), 0);
                makeBroadcastRecord(new Intent(Intent.ACTION_TIMEZONE_CHANGED)), 0,
                null /* replacedBroadcastConsumer */);
        queue.enqueueOrReplaceBroadcast(
                makeBroadcastRecord(new Intent(Intent.ACTION_LOCKED_BOOT_COMPLETED)
                        .addFlags(Intent.FLAG_RECEIVER_FOREGROUND)), 0);
                        .addFlags(Intent.FLAG_RECEIVER_FOREGROUND)), 0,
                null /* replacedBroadcastConsumer */);
        queue.enqueueOrReplaceBroadcast(
                makeBroadcastRecord(new Intent(Intent.ACTION_ALARM_CHANGED)
                        .addFlags(Intent.FLAG_RECEIVER_OFFLOAD)), 0);
                        .addFlags(Intent.FLAG_RECEIVER_OFFLOAD)), 0,
                null /* replacedBroadcastConsumer */);
        queue.enqueueOrReplaceBroadcast(
                makeBroadcastRecord(new Intent(Intent.ACTION_TIME_TICK)), 0);
                makeBroadcastRecord(new Intent(Intent.ACTION_TIME_TICK)), 0,
                null /* replacedBroadcastConsumer */);
        queue.enqueueOrReplaceBroadcast(
                makeBroadcastRecord(new Intent(Intent.ACTION_LOCALE_CHANGED)
                        .addFlags(Intent.FLAG_RECEIVER_FOREGROUND)), 0);
                        .addFlags(Intent.FLAG_RECEIVER_FOREGROUND)), 0,
                null /* replacedBroadcastConsumer */);

        queue.makeActiveNextPending();
        assertEquals(Intent.ACTION_LOCKED_BOOT_COMPLETED, queue.getActive().intent.getAction());
+29 −7
Original line number Diff line number Diff line
@@ -154,6 +154,7 @@ public class BroadcastQueueTest {

    private ActivityManagerService mAms;
    private BroadcastQueue mQueue;
    BroadcastConstants mConstants;

    /**
     * Desired behavior of the next
@@ -277,10 +278,9 @@ public class BroadcastQueueTest {
        }).when(mAms).getProcessRecordLocked(any(), anyInt());
        doNothing().when(mAms).appNotResponding(any(), any());

        final BroadcastConstants constants = new BroadcastConstants(
                Settings.Global.BROADCAST_FG_CONSTANTS);
        constants.TIMEOUT = 100;
        constants.ALLOW_BG_ACTIVITY_START_TIMEOUT = 0;
        mConstants = new BroadcastConstants(Settings.Global.BROADCAST_FG_CONSTANTS);
        mConstants.TIMEOUT = 100;
        mConstants.ALLOW_BG_ACTIVITY_START_TIMEOUT = 0;
        final BroadcastSkipPolicy emptySkipPolicy = new BroadcastSkipPolicy(mAms) {
            public boolean shouldSkip(BroadcastRecord r, Object o) {
                // Ignored
@@ -291,7 +291,7 @@ public class BroadcastQueueTest {
                return null;
            }
        };
        final BroadcastHistory emptyHistory = new BroadcastHistory(constants) {
        final BroadcastHistory emptyHistory = new BroadcastHistory(mConstants) {
            public void addBroadcastToHistoryLocked(BroadcastRecord original) {
                // Ignored
            }
@@ -299,13 +299,13 @@ public class BroadcastQueueTest {

        if (mImpl == Impl.DEFAULT) {
            var q = new BroadcastQueueImpl(mAms, mHandlerThread.getThreadHandler(), TAG,
                    constants, emptySkipPolicy, emptyHistory, false,
                    mConstants, emptySkipPolicy, emptyHistory, false,
                    ProcessList.SCHED_GROUP_DEFAULT);
            q.mReceiverBatch.mDeepReceiverCopy = true;
            mQueue = q;
        } else if (mImpl == Impl.MODERN) {
            var q = new BroadcastQueueModernImpl(mAms, mHandlerThread.getThreadHandler(),
                    constants, constants, emptySkipPolicy, emptyHistory);
                    mConstants, mConstants, emptySkipPolicy, emptyHistory);
            q.mReceiverBatch.mDeepReceiverCopy = true;
            mQueue = q;
        } else {
@@ -1702,6 +1702,28 @@ public class BroadcastQueueTest {
                null, null, null, null, null, false, null, null));
    }

    @Test
    public void testReplacePending_withPrioritizedBroadcasts() throws Exception {
        mConstants.MAX_RUNNING_ACTIVE_BROADCASTS = 1;
        final ProcessRecord callerApp = makeActiveProcessRecord(PACKAGE_GREEN);

        final Intent userPresent = new Intent(Intent.ACTION_USER_PRESENT)
                .addFlags(Intent.FLAG_RECEIVER_REPLACE_PENDING);

        final List receivers = List.of(
                withPriority(makeManifestReceiver(PACKAGE_GREEN, CLASS_GREEN), 100),
                withPriority(makeManifestReceiver(PACKAGE_GREEN, CLASS_RED), 50),
                withPriority(makeManifestReceiver(PACKAGE_GREEN, CLASS_YELLOW), 10),
                withPriority(makeManifestReceiver(PACKAGE_GREEN, CLASS_BLUE), 0));

        // Enqueue the broadcast a few times and verify that broadcast queues are not stuck
        // and are emptied eventually.
        for (int i = 0; i < 6; ++i) {
            enqueueBroadcast(makeBroadcastRecord(userPresent, callerApp, receivers));
        }
        waitForIdle();
    }

    @Test
    public void testIdleAndBarrier() throws Exception {
        final ProcessRecord callerApp = makeActiveProcessRecord(PACKAGE_RED);