Donate to e Foundation | Murena handsets with /e/OS | Own a part of Murena! Learn more

Commit d5b0d8c5 authored by Jeff Sharkey's avatar Jeff Sharkey
Browse files

Fix bug around deferred prioritized broadcasts.

When deciding if a particular "tranche" of prioritized broadcasts has
been finished, we should consider both the terminal and deferred
states of members of that tranche.  Previous attempts to do this used
a pure counting-based mechanism, which can risk opening up future
tranches too early when many receivers have been deferred.

This fix pivots to a new "beyond" concept that tracks the highest
receiver that a broadcast has "moved beyond".  This can be either a
terminal state, or deferred states within the currently active
tranche.  Once we've moved "beyond" particular receiver, we treat
it as a high-water mark, even when previously deferred broadcasts
that we've moved beyond pivot back to a pending state.

Bug: 272147987
Test: atest FrameworksMockingServicesTests:BroadcastQueueTest
Test: atest FrameworksMockingServicesTests:BroadcastQueueModernImplTest
Test: atest FrameworksMockingServicesTests:BroadcastRecordTest
Change-Id: Iac4c5c7e92a7e009acbe1c248f6f56106152f032
parent 1be68459
Loading
Loading
Loading
Loading
+9 −29
Original line number Diff line number Diff line
@@ -18,7 +18,6 @@ package com.android.server.am;

import static com.android.internal.util.Preconditions.checkState;
import static com.android.server.am.BroadcastRecord.deliveryStateToString;
import static com.android.server.am.BroadcastRecord.isDeliveryStateTerminal;
import static com.android.server.am.BroadcastRecord.isReceiverEquals;

import android.annotation.IntDef;
@@ -709,7 +708,7 @@ class BroadcastProcessQueue {
                || consecutiveHighPriorityCount >= maxHighPriorityDispatchLimit);
        final boolean isLPQueueEligible = shouldConsiderLPQueue
                && nextLPRecord.enqueueTime <= nextHPRecord.enqueueTime
                && !blockedOnOrderedDispatch(nextLPRecord, nextLPRecordIndex);
                && !nextLPRecord.isBlocked(nextLPRecordIndex);
        return isLPQueueEligible ? lowPriorityQueue : highPriorityQueue;
    }

@@ -912,39 +911,20 @@ class BroadcastProcessQueue {
        }
    }

    private boolean blockedOnOrderedDispatch(BroadcastRecord r, int index) {
        final int blockedUntilTerminalCount = r.blockedUntilTerminalCount[index];

        int existingDeferredCount = 0;
        if (r.deferUntilActive) {
            for (int i = 0; i < index; i++) {
                if (r.deferredUntilActive[i]) existingDeferredCount++;
            }
        }

        // We might be blocked waiting for other receivers to finish,
        // typically for an ordered broadcast or priority traunches
        if ((r.terminalCount + existingDeferredCount) < blockedUntilTerminalCount
                && !isDeliveryStateTerminal(r.getDeliveryState(index))) {
            return true;
        }
        return false;
    }

    /**
     * Update {@link #getRunnableAt()} if it's currently invalidated.
     */
    private void updateRunnableAt() {
        final SomeArgs next = peekNextBroadcast();
        if (!mRunnableAtInvalidated) return;
        mRunnableAtInvalidated = false;

        final SomeArgs next = peekNextBroadcast();
        if (next != null) {
            final BroadcastRecord r = (BroadcastRecord) next.arg1;
            final int index = next.argi1;
            final long runnableAt = r.enqueueTime;

            // If we're specifically queued behind other ordered dispatch activity,
            // we aren't runnable yet
            if (blockedOnOrderedDispatch(r, index)) {
            if (r.isBlocked(index)) {
                mRunnableAt = Long.MAX_VALUE;
                mRunnableAtReason = REASON_BLOCKED;
                return;
@@ -1262,12 +1242,12 @@ class BroadcastProcessQueue {
            pw.print(info.activityInfo.name);
        }
        pw.println();
        final int blockedUntilTerminalCount = record.blockedUntilTerminalCount[recordIndex];
        if (blockedUntilTerminalCount != -1) {
        final int blockedUntilBeyondCount = record.blockedUntilBeyondCount[recordIndex];
        if (blockedUntilBeyondCount != -1) {
            pw.print("    blocked until ");
            pw.print(blockedUntilTerminalCount);
            pw.print(blockedUntilBeyondCount);
            pw.print(", currently at ");
            pw.print(record.terminalCount);
            pw.print(record.beyondCount);
            pw.print(" of ");
            pw.println(record.receivers.size());
        }
+23 −29
Original line number Diff line number Diff line
@@ -1008,6 +1008,7 @@ class BroadcastQueueModernImpl extends BroadcastQueue {
        }

        final BroadcastRecord r = queue.getActive();
        final int index = queue.getActiveIndex();
        if (r.ordered) {
            r.resultCode = resultCode;
            r.resultData = resultData;
@@ -1015,18 +1016,24 @@ class BroadcastQueueModernImpl extends BroadcastQueue {
            if (!r.isNoAbort()) {
                r.resultAbort = resultAbort;
            }
        }

        // To ensure that "beyond" high-water marks are updated in a monotonic
        // way, we finish this receiver before possibly skipping any remaining
        // aborted receivers
        final boolean res = finishReceiverActiveLocked(queue,
                BroadcastRecord.DELIVERY_DELIVERED, "remote app");

        // When the caller aborted an ordered broadcast, we mark all
        // remaining receivers as skipped
        if (r.resultAbort) {
                for (int i = r.terminalCount + 1; i < r.receivers.size(); i++) {
            for (int i = index + 1; i < r.receivers.size(); i++) {
                setDeliveryState(null, null, r, i, r.receivers.get(i),
                        BroadcastRecord.DELIVERY_SKIPPED, "resultAbort");
            }
        }
        }

        return finishReceiverActiveLocked(queue, BroadcastRecord.DELIVERY_DELIVERED, "remote app");
        return res;
    }

    /**
@@ -1108,21 +1115,10 @@ class BroadcastQueueModernImpl extends BroadcastQueue {
            @NonNull Object receiver, @DeliveryState int newDeliveryState,
            @NonNull String reason) {
        final int cookie = traceBegin("setDeliveryState");
        final int oldDeliveryState = getDeliveryState(r, index);
        boolean checkFinished = false;

        // Only apply state when we haven't already reached a terminal state;
        // this is how we ignore racing timeout messages
        if (!isDeliveryStateTerminal(oldDeliveryState)) {
            r.setDeliveryState(index, newDeliveryState, reason);
            if (oldDeliveryState == BroadcastRecord.DELIVERY_DEFERRED) {
                r.deferredCount--;
            } else if (newDeliveryState == BroadcastRecord.DELIVERY_DEFERRED) {
                // If we're deferring a broadcast, maybe that's enough to unblock the final callback
                r.deferredCount++;
                checkFinished = true;
            }
        }
        // Remember the old state and apply the new state
        final int oldDeliveryState = getDeliveryState(r, index);
        final boolean beyondCountChanged = r.setDeliveryState(index, newDeliveryState, reason);

        // Emit any relevant tracing results when we're changing the delivery
        // state as part of running from a queue
@@ -1147,15 +1143,13 @@ class BroadcastQueueModernImpl extends BroadcastQueue {
                        + deliveryStateToString(newDeliveryState) + " because " + reason);
            }

            r.terminalCount++;
            notifyFinishReceiver(queue, app, r, index, receiver);
            checkFinished = true;
        }
        // When entire ordered broadcast finished, deliver final result
        if (checkFinished) {
            final boolean recordFinished =
                    ((r.terminalCount + r.deferredCount) == r.receivers.size());
            if (recordFinished) {

        // When we've reached a new high-water mark, we might be in a position
        // to unblock other receivers or the final resultTo
        if (beyondCountChanged) {
            if (r.beyondCount == r.receivers.size()) {
                scheduleResultTo(r);
            }

+91 −35
Original line number Diff line number Diff line
@@ -24,6 +24,7 @@ import static com.android.server.am.BroadcastConstants.DEFER_BOOT_COMPLETED_BROA
import static com.android.server.am.BroadcastConstants.DEFER_BOOT_COMPLETED_BROADCAST_NONE;
import static com.android.server.am.BroadcastConstants.DEFER_BOOT_COMPLETED_BROADCAST_TARGET_T_ONLY;

import android.annotation.CheckResult;
import android.annotation.CurrentTimeMillisLong;
import android.annotation.ElapsedRealtimeLong;
import android.annotation.IntDef;
@@ -101,8 +102,7 @@ final class BroadcastRecord extends Binder {
    final @NonNull List<Object> receivers;   // contains BroadcastFilter and ResolveInfo
    final @DeliveryState int[] delivery;   // delivery state of each receiver
    final @NonNull String[] deliveryReasons; // reasons for delivery state of each receiver
    final boolean[] deferredUntilActive; // whether each receiver is infinitely deferred
    final int[] blockedUntilTerminalCount; // blocked until count of each receiver
    final int[] blockedUntilBeyondCount; // blocked until count of each receiver
    @Nullable ProcessRecord resultToApp; // who receives final result if non-null
    @Nullable IIntentReceiver resultTo; // who receives final result if non-null
    boolean deferred;
@@ -134,6 +134,7 @@ final class BroadcastRecord extends Binder {
    int manifestSkipCount;  // number of manifest receivers skipped.
    int terminalCount;      // number of receivers in terminal state.
    int deferredCount;      // number of receivers in deferred state.
    int beyondCount;        // high-water number of receivers we've moved beyond.
    @Nullable BroadcastQueue queue;   // the outbound queue handling this broadcast

    // Determines the privileges the app's process has in regard to background starts.
@@ -219,6 +220,23 @@ final class BroadcastRecord extends Binder {
        }
    }

    /**
     * Return if the given delivery state is "beyond", which means that we've
     * moved beyond this receiver, and future receivers are now unblocked.
     */
    static boolean isDeliveryStateBeyond(@DeliveryState int deliveryState) {
        switch (deliveryState) {
            case DELIVERY_DELIVERED:
            case DELIVERY_SKIPPED:
            case DELIVERY_TIMEOUT:
            case DELIVERY_FAILURE:
            case DELIVERY_DEFERRED:
                return true;
            default:
                return false;
        }
    }

    ProcessRecord curApp;       // hosting application of current receiver.
    ComponentName curComponent; // the receiver class that is currently running.
    ActivityInfo curReceiver;   // the manifest receiver that is currently running.
@@ -356,7 +374,7 @@ final class BroadcastRecord extends Binder {
                TimeUtils.formatDuration(terminalTime[i] - scheduledTime[i], pw);
                pw.print(' ');
            }
            pw.print("("); pw.print(blockedUntilTerminalCount[i]); pw.print(") ");
            pw.print("("); pw.print(blockedUntilBeyondCount[i]); pw.print(") ");
            pw.print("#"); pw.print(i); pw.print(": ");
            if (o instanceof BroadcastFilter) {
                pw.println(o);
@@ -411,8 +429,7 @@ final class BroadcastRecord extends Binder {
        urgent = calculateUrgent(_intent, _options);
        deferUntilActive = calculateDeferUntilActive(_callingUid,
                _options, _resultTo, _serialized, urgent);
        deferredUntilActive = new boolean[deferUntilActive ? delivery.length : 0];
        blockedUntilTerminalCount = calculateBlockedUntilTerminalCount(receivers, _serialized);
        blockedUntilBeyondCount = calculateBlockedUntilBeyondCount(receivers, _serialized);
        scheduledTime = new long[delivery.length];
        terminalTime = new long[delivery.length];
        resultToApp = _resultToApp;
@@ -423,7 +440,7 @@ final class BroadcastRecord extends Binder {
        ordered = _serialized;
        sticky = _sticky;
        initialSticky = _initialSticky;
        prioritized = isPrioritized(blockedUntilTerminalCount, _serialized);
        prioritized = isPrioritized(blockedUntilBeyondCount, _serialized);
        userId = _userId;
        nextReceiver = 0;
        state = IDLE;
@@ -467,8 +484,7 @@ final class BroadcastRecord extends Binder {
        delivery = from.delivery;
        deliveryReasons = from.deliveryReasons;
        deferUntilActive = from.deferUntilActive;
        deferredUntilActive = from.deferredUntilActive;
        blockedUntilTerminalCount = from.blockedUntilTerminalCount;
        blockedUntilBeyondCount = from.blockedUntilBeyondCount;
        scheduledTime = from.scheduledTime;
        terminalTime = from.terminalTime;
        resultToApp = from.resultToApp;
@@ -627,32 +643,72 @@ final class BroadcastRecord extends Binder {
    /**
     * Update the delivery state of the given {@link #receivers} index.
     * Automatically updates any time measurements related to state changes.
     *
     * @return if {@link #beyondCount} changed due to this state transition,
     *         indicating that other events may be unblocked.
     */
    void setDeliveryState(int index, @DeliveryState int deliveryState,
    @CheckResult
    boolean setDeliveryState(int index, @DeliveryState int newDeliveryState,
            @NonNull String reason) {
        delivery[index] = deliveryState;
        deliveryReasons[index] = reason;
        if (deferUntilActive) deferredUntilActive[index] = false;
        switch (deliveryState) {
        final int oldDeliveryState = delivery[index];
        if (isDeliveryStateTerminal(oldDeliveryState)
                || newDeliveryState == oldDeliveryState) {
            // We've already arrived in terminal or requested state, so leave
            // any statistics and reasons intact from the first transition
            return false;
        }

        switch (oldDeliveryState) {
            case DELIVERY_DEFERRED:
                deferredCount--;
                break;
        }
        switch (newDeliveryState) {
            case DELIVERY_SCHEDULED:
                scheduledTime[index] = SystemClock.uptimeMillis();
                break;
            case DELIVERY_DEFERRED:
                deferredCount++;
                break;
            case DELIVERY_DELIVERED:
            case DELIVERY_SKIPPED:
            case DELIVERY_TIMEOUT:
            case DELIVERY_FAILURE:
                terminalTime[index] = SystemClock.uptimeMillis();
                terminalCount++;
                break;
            case DELIVERY_SCHEDULED:
                scheduledTime[index] = SystemClock.uptimeMillis();
                break;
            case DELIVERY_DEFERRED:
                if (deferUntilActive) deferredUntilActive[index] = true;
        }

        delivery[index] = newDeliveryState;
        deliveryReasons[index] = reason;

        // If this state change might bring us to a new high-water mark, bring
        // ourselves as high as we possibly can
        final int oldBeyondCount = beyondCount;
        if (index >= beyondCount) {
            for (int i = beyondCount; i < delivery.length; i++) {
                if (isDeliveryStateBeyond(getDeliveryState(i))) {
                    beyondCount = i + 1;
                } else {
                    break;
                }
            }
        }
        return (beyondCount != oldBeyondCount);
    }

    @DeliveryState int getDeliveryState(int index) {
        return delivery[index];
    }

    /**
     * @return if the given {@link #receivers} index should be considered
     *         blocked based on the current status of the overall broadcast.
     */
    boolean isBlocked(int index) {
        return (beyondCount < blockedUntilBeyondCount[index]);
    }

    boolean wasDeliveryAttempted(int index) {
        final int deliveryState = getDeliveryState(index);
        switch (deliveryState) {
@@ -757,36 +813,36 @@ final class BroadcastRecord extends Binder {
     * has prioritized tranches of receivers.
     */
    @VisibleForTesting
    static boolean isPrioritized(@NonNull int[] blockedUntilTerminalCount,
    static boolean isPrioritized(@NonNull int[] blockedUntilBeyondCount,
            boolean ordered) {
        return !ordered && (blockedUntilTerminalCount.length > 0)
                && (blockedUntilTerminalCount[0] != -1);
        return !ordered && (blockedUntilBeyondCount.length > 0)
                && (blockedUntilBeyondCount[0] != -1);
    }

    /**
     * Calculate the {@link #terminalCount} that each receiver should be
     * Calculate the {@link #beyondCount} that each receiver should be
     * considered blocked until.
     * <p>
     * For example, in an ordered broadcast, receiver {@code N} is blocked until
     * receiver {@code N-1} reaches a terminal state. Similarly, in a
     * prioritized broadcast, receiver {@code N} is blocked until all receivers
     * of a higher priority reach a terminal state.
     * receiver {@code N-1} reaches a terminal or deferred state. Similarly, in
     * a prioritized broadcast, receiver {@code N} is blocked until all
     * receivers of a higher priority reach a terminal or deferred state.
     * <p>
     * When there are no terminal count constraints, the blocked value for each
     * When there are no beyond count constraints, the blocked value for each
     * receiver is {@code -1}.
     */
    @VisibleForTesting
    static @NonNull int[] calculateBlockedUntilTerminalCount(
    static @NonNull int[] calculateBlockedUntilBeyondCount(
            @NonNull List<Object> receivers, boolean ordered) {
        final int N = receivers.size();
        final int[] blockedUntilTerminalCount = new int[N];
        final int[] blockedUntilBeyondCount = new int[N];
        int lastPriority = 0;
        int lastPriorityIndex = 0;
        for (int i = 0; i < N; i++) {
            if (ordered) {
                // When sending an ordered broadcast, we need to block this
                // receiver until all previous receivers have terminated
                blockedUntilTerminalCount[i] = i;
                blockedUntilBeyondCount[i] = i;
            } else {
                // When sending a prioritized broadcast, we only need to wait
                // for the previous tranche of receivers to be terminated
@@ -794,18 +850,18 @@ final class BroadcastRecord extends Binder {
                if ((i == 0) || (thisPriority != lastPriority)) {
                    lastPriority = thisPriority;
                    lastPriorityIndex = i;
                    blockedUntilTerminalCount[i] = i;
                    blockedUntilBeyondCount[i] = i;
                } else {
                    blockedUntilTerminalCount[i] = lastPriorityIndex;
                    blockedUntilBeyondCount[i] = lastPriorityIndex;
                }
            }
        }
        // If the entire list is in the same priority tranche, mark as -1 to
        // indicate that none of them need to wait
        if (N > 0 && blockedUntilTerminalCount[N - 1] == 0) {
            Arrays.fill(blockedUntilTerminalCount, -1);
        if (N > 0 && blockedUntilBeyondCount[N - 1] == 0) {
            Arrays.fill(blockedUntilBeyondCount, -1);
        }
        return blockedUntilTerminalCount;
        return blockedUntilBeyondCount;
    }

    static int getReceiverUid(@NonNull Object receiver) {
+2 −1
Original line number Diff line number Diff line
@@ -463,7 +463,8 @@ public final class BroadcastQueueModernImplTest {
        assertEquals(BroadcastProcessQueue.REASON_BLOCKED, queue.getRunnableAtReason());

        // Bumping past barrier makes us now runnable
        airplaneRecord.terminalCount++;
        airplaneRecord.setDeliveryState(0, BroadcastRecord.DELIVERY_DELIVERED,
                "testRunnableAt_Ordered");
        queue.invalidateRunnableAt();
        assertTrue(queue.isRunnable());
        assertNotEquals(BroadcastProcessQueue.REASON_BLOCKED, queue.getRunnableAtReason());
+2 −2
Original line number Diff line number Diff line
@@ -464,7 +464,7 @@ public class BroadcastQueueTest {

        doAnswer((invocation) -> {
            Log.v(TAG, "Intercepting scheduleReceiver() for "
                    + Arrays.toString(invocation.getArguments()));
                    + Arrays.toString(invocation.getArguments()) + " package " + ai.packageName);
            assertHealth();
            final Intent intent = invocation.getArgument(0);
            final Bundle extras = invocation.getArgument(5);
@@ -486,7 +486,7 @@ public class BroadcastQueueTest {

        doAnswer((invocation) -> {
            Log.v(TAG, "Intercepting scheduleRegisteredReceiver() for "
                    + Arrays.toString(invocation.getArguments()));
                    + Arrays.toString(invocation.getArguments()) + " package " + ai.packageName);
            assertHealth();
            final Intent intent = invocation.getArgument(1);
            final Bundle extras = invocation.getArgument(4);
Loading