Donate to e Foundation | Murena handsets with /e/OS | Own a part of Murena! Learn more

Commit 80ee645a authored by Jeff Sharkey's avatar Jeff Sharkey
Browse files

Initial pass at "Snowbird" broadcast queue.

Now that we have a solid platform carved out, we can start a second
BroadcastQueue implementation with the new ideas we're pursuing for
the "Snowbird" project.

Generally speaking, this new implementation pivots broadcast dispatch
to be handled on a per-process basis, as represented by the new
BroadcastProcessQueue class.  Each per-process queue has the concept
of being "runnable at" a specific time in the future, which supports
us pausing or delaying broadcast delivery on a per-process basis.

This initial pass is just enough of the functionality to pass the
local unit tests we've recently written, which currently cover very
basic situations like manifest-vs-registered and warm-vs-cold starts;
much of the remaining work is left as "TODO" statements, which will
be fleshed out via TDD.

Bug: 245771249
Test: atest FrameworksMockingServicesTests:BroadcastQueueTest
Change-Id: If83377c2012dd16fe8f62f90704263a977763dc7
parent f577ac06
Loading
Loading
Loading
Loading
+36 −22
Original line number Diff line number Diff line
@@ -667,10 +667,17 @@ public class ActivityManagerService extends IActivityManager.Stub
    // we still create this new offload queue, but never ever put anything on it.
    final boolean mEnableOffloadQueue;
    final BroadcastQueue mFgBroadcastQueue;
    final BroadcastQueue mBgBroadcastQueue;
    final BroadcastQueue mBgOffloadBroadcastQueue;
    final BroadcastQueue mFgOffloadBroadcastQueue;
    /**
     * Flag indicating if we should use {@link BroadcastQueueModernImpl} instead
     * of the default {@link BroadcastQueueImpl}.
     */
    final boolean mEnableModernQueue;
    static final int BROADCAST_QUEUE_FG = 0;
    static final int BROADCAST_QUEUE_BG = 1;
    static final int BROADCAST_QUEUE_BG_OFFLOAD = 2;
    static final int BROADCAST_QUEUE_FG_OFFLOAD = 3;
    // Convenient for easy iteration over the queues. Foreground is first
    // so that dispatch of foreground broadcasts gets precedence.
    final BroadcastQueue[] mBroadcastQueues;
@@ -692,12 +699,16 @@ public class ActivityManagerService extends IActivityManager.Stub
    }
    BroadcastQueue broadcastQueueForFlags(int flags, Object cookie) {
        if (mEnableModernQueue) {
            return mBroadcastQueues[0];
        }
        if (isOnFgOffloadQueue(flags)) {
            if (DEBUG_BROADCAST_BACKGROUND) {
                Slog.i(TAG_BROADCAST,
                        "Broadcast intent " + cookie + " on foreground offload queue");
            }
            return mFgOffloadBroadcastQueue;
            return mBroadcastQueues[BROADCAST_QUEUE_FG_OFFLOAD];
        }
        if (isOnBgOffloadQueue(flags)) {
@@ -705,14 +716,15 @@ public class ActivityManagerService extends IActivityManager.Stub
                Slog.i(TAG_BROADCAST,
                        "Broadcast intent " + cookie + " on background offload queue");
            }
            return mBgOffloadBroadcastQueue;
            return mBroadcastQueues[BROADCAST_QUEUE_BG_OFFLOAD];
        }
        final boolean isFg = (flags & Intent.FLAG_RECEIVER_FOREGROUND) != 0;
        if (DEBUG_BROADCAST_BACKGROUND) Slog.i(TAG_BROADCAST,
                "Broadcast intent " + cookie + " on "
                + (isFg ? "foreground" : "background") + " queue");
        return (isFg) ? mFgBroadcastQueue : mBgBroadcastQueue;
        return (isFg) ? mBroadcastQueues[BROADCAST_QUEUE_FG]
                : mBroadcastQueues[BROADCAST_QUEUE_BG];
    }
    private volatile int mDeviceOwnerUid = INVALID_UID;
@@ -2365,9 +2377,8 @@ public class ActivityManagerService extends IActivityManager.Stub
        mPendingStartActivityUids = new PendingStartActivityUids();
        mUseFifoUiScheduling = false;
        mEnableOffloadQueue = false;
        mEnableModernQueue = false;
        mBroadcastQueues = new BroadcastQueue[0];
        mFgBroadcastQueue = mBgBroadcastQueue = mBgOffloadBroadcastQueue =
                mFgOffloadBroadcastQueue = null;
        mComponentAliasResolver = new ComponentAliasResolver(this);
    }
@@ -2423,20 +2434,23 @@ public class ActivityManagerService extends IActivityManager.Stub
        mEnableOffloadQueue = SystemProperties.getBoolean(
                "persist.device_config.activity_manager_native_boot.offload_queue_enabled", true);
        mEnableModernQueue = SystemProperties.getBoolean(
                "persist.device_config.activity_manager_native_boot.modern_queue_enabled", false);
        if (mEnableModernQueue) {
            mBroadcastQueues = new BroadcastQueue[1];
            mBroadcastQueues[0] = new BroadcastQueueModernImpl(this, mHandler, foreConstants);
        } else {
            mBroadcastQueues = new BroadcastQueue[4];
        mFgBroadcastQueue = new BroadcastQueueImpl(this, mHandler,
            mBroadcastQueues[BROADCAST_QUEUE_FG] = new BroadcastQueueImpl(this, mHandler,
                    "foreground", foreConstants, false, ProcessList.SCHED_GROUP_DEFAULT);
        mBgBroadcastQueue = new BroadcastQueueImpl(this, mHandler,
            mBroadcastQueues[BROADCAST_QUEUE_BG] = new BroadcastQueueImpl(this, mHandler,
                    "background", backConstants, true, ProcessList.SCHED_GROUP_BACKGROUND);
        mBgOffloadBroadcastQueue = new BroadcastQueueImpl(this, mHandler,
            mBroadcastQueues[BROADCAST_QUEUE_BG_OFFLOAD] = new BroadcastQueueImpl(this, mHandler,
                    "offload_bg", offloadConstants, true, ProcessList.SCHED_GROUP_BACKGROUND);
        mFgOffloadBroadcastQueue = new BroadcastQueueImpl(this, mHandler,
            mBroadcastQueues[BROADCAST_QUEUE_FG_OFFLOAD] = new BroadcastQueueImpl(this, mHandler,
                    "offload_fg", foreConstants, true, ProcessList.SCHED_GROUP_BACKGROUND);
        mBroadcastQueues[0] = mFgBroadcastQueue;
        mBroadcastQueues[1] = mBgBroadcastQueue;
        mBroadcastQueues[2] = mBgOffloadBroadcastQueue;
        mBroadcastQueues[3] = mFgOffloadBroadcastQueue;
        }
        mServices = new ActiveServices(this);
        mCpHelper = new ContentProviderHelper(this, true);
+298 −0
Original line number Diff line number Diff line
/*
 * Copyright (C) 2022 The Android Open Source Project
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *      http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package com.android.server.am;

import static com.android.server.am.BroadcastQueue.checkState;

import android.annotation.NonNull;
import android.annotation.Nullable;
import android.annotation.UptimeMillisLong;
import android.os.UserHandle;
import android.util.IndentingPrintWriter;

import com.android.internal.os.SomeArgs;

import java.util.ArrayDeque;

/**
 * Queue of pending {@link BroadcastRecord} entries intended for delivery to a
 * specific process.
 * <p>
 * Each queue has a concept of being "runnable at" a particular time in the
 * future, which supports arbitrarily pausing or delaying delivery on a
 * per-process basis.
 * <p>
 * Internally each queue consists of a pending broadcasts which are waiting to
 * be dispatched, and a single active broadcast which is currently being
 * dispatched.
 */
class BroadcastProcessQueue implements Comparable<BroadcastProcessQueue> {
    /**
     * Default delay to apply to background broadcasts, giving a chance for
     * debouncing of rapidly changing events.
     */
    // TODO: shift hard-coded defaults to BroadcastConstants
    private static final long DELAY_DEFAULT_MILLIS = 10_000;

    /**
     * Default delay to apply to broadcasts targeting cached applications.
     */
    // TODO: shift hard-coded defaults to BroadcastConstants
    private static final long DELAY_CACHED_MILLIS = 30_000;

    final @NonNull String processName;
    final int uid;

    /**
     * Linked list connection to another process under this {@link #uid} which
     * has a different {@link #processName}.
     */
    @Nullable BroadcastProcessQueue next;

    /**
     * Currently known details about the target process; typically undefined
     * when the process isn't actively running.
     */
    @Nullable ProcessRecord app;

    /**
     * Ordered collection of broadcasts that are waiting to be dispatched to
     * this process, as a pair of {@link BroadcastRecord} and the index into
     * {@link BroadcastRecord#receivers} that represents the receiver.
     */
    private final ArrayDeque<SomeArgs> mPending = new ArrayDeque<>();

    /**
     * Broadcast actively being dispatched to this process.
     */
    private @Nullable BroadcastRecord mActive;

    /**
     * Receiver actively being dispatched to in this process. This is an index
     * into the {@link BroadcastRecord#receivers} list of {@link #mActive}.
     */
    private int mActiveIndex;

    private int mCountForeground;
    private int mCountOrdered;
    private int mCountAlarm;

    private @UptimeMillisLong long mRunnableAt = Long.MAX_VALUE;
    private boolean mRunnableAtInvalidated;

    private boolean mProcessCached;

    public BroadcastProcessQueue(@NonNull String processName, int uid) {
        this.processName = processName;
        this.uid = uid;
    }

    /**
     * Enqueue the given broadcast to be dispatched to this process at some
     * future point in time. The target receiver is indicated by the given index
     * into {@link BroadcastRecord#receivers}.
     */
    public void enqueueBroadcast(@NonNull BroadcastRecord record, int recordIndex) {
        // Detect situations where the incoming broadcast should cause us to
        // recalculate when we'll be runnable
        if (mPending.isEmpty()) {
            invalidateRunnableAt();
        }
        if (record.isForeground()) {
            mCountForeground++;
            invalidateRunnableAt();
        }
        if (record.ordered) {
            mCountOrdered++;
            invalidateRunnableAt();
        }
        if (record.alarm) {
            mCountAlarm++;
            invalidateRunnableAt();
        }
        SomeArgs args = SomeArgs.obtain();
        args.arg1 = record;
        args.argi1 = recordIndex;
        mPending.addLast(args);
    }

    /**
     * Update if this process is in the "cached" state, typically signaling that
     * broadcast dispatch should be paused or delayed.
     */
    public void setProcessCached(boolean cached) {
        if (mProcessCached != cached) {
            mProcessCached = cached;
            invalidateRunnableAt();
        }
    }

    /**
     * Return if we know of an actively running "warm" process for this queue.
     */
    public boolean isProcessWarm() {
        return (app != null) && (app.getThread() != null) && !app.isKilled();
    }

    public int getPreferredSchedulingGroupLocked() {
        if (mCountForeground > 0 || mCountOrdered > 0 || mCountAlarm > 0) {
            // We have an important broadcast somewhere down the queue, so
            // boost priority until we drain them all
            return ProcessList.SCHED_GROUP_DEFAULT;
        } else if ((mActive != null)
                && (mActive.isForeground() || mActive.ordered || mActive.alarm)) {
            // We have an important broadcast right now, so boost priority
            return ProcessList.SCHED_GROUP_DEFAULT;
        } else {
            return ProcessList.SCHED_GROUP_BACKGROUND;
        }
    }

    /**
     * Set the currently active broadcast to the next pending broadcast.
     */
    public void makeActiveNextPending() {
        // TODO: what if the next broadcast isn't runnable yet?
        checkState(isRunnable(), "isRunnable");
        final SomeArgs next = mPending.removeFirst();
        mActive = (BroadcastRecord) next.arg1;
        mActiveIndex = next.argi1;
        next.recycle();
        if (mActive.isForeground()) {
            mCountForeground--;
        }
        if (mActive.ordered) {
            mCountOrdered--;
        }
        if (mActive.alarm) {
            mCountAlarm--;
        }
        invalidateRunnableAt();
    }

    /**
     * Set the currently running broadcast to be idle.
     */
    public void makeActiveIdle() {
        mActive = null;
        mActiveIndex = 0;
    }

    public void setActiveDeliveryState(int deliveryState) {
        checkState(isActive(), "isActive");
        mActive.setDeliveryState(mActiveIndex, deliveryState);
    }

    public @NonNull BroadcastRecord getActive() {
        checkState(isActive(), "isActive");
        return mActive;
    }

    public @NonNull Object getActiveReceiver() {
        checkState(isActive(), "isActive");
        return mActive.receivers.get(mActiveIndex);
    }

    public boolean isActive() {
        return mActive != null;
    }

    public boolean isRunnable() {
        if (mRunnableAtInvalidated) updateRunnableAt();
        return mRunnableAt != Long.MAX_VALUE;
    }

    /**
     * Return time at which this process is considered runnable. This is
     * typically the time at which the next pending broadcast was first
     * enqueued, but it also reflects any pauses or delays that should be
     * applied to the process.
     * <p>
     * Returns {@link Long#MAX_VALUE} when this queue isn't currently runnable,
     * typically when the queue is empty or when paused.
     */
    public @UptimeMillisLong long getRunnableAt() {
        if (mRunnableAtInvalidated) updateRunnableAt();
        return mRunnableAt;
    }

    private void invalidateRunnableAt() {
        mRunnableAtInvalidated = true;
    }

    /**
     * Update {@link #getRunnableAt()} if it's currently invalidated.
     */
    private void updateRunnableAt() {
        final SomeArgs next = mPending.peekFirst();
        if (next != null) {
            final long runnableAt = ((BroadcastRecord) next.arg1).enqueueTime;
            if (mCountForeground > 0) {
                mRunnableAt = runnableAt;
            } else if (mCountOrdered > 0) {
                mRunnableAt = runnableAt;
            } else if (mCountAlarm > 0) {
                mRunnableAt = runnableAt;
            } else if (mProcessCached) {
                mRunnableAt = runnableAt + DELAY_CACHED_MILLIS;
            } else {
                mRunnableAt = runnableAt + DELAY_DEFAULT_MILLIS;
            }
        } else {
            mRunnableAt = Long.MAX_VALUE;
        }
    }

    @Override
    public int compareTo(BroadcastProcessQueue o) {
        if (mRunnableAtInvalidated) updateRunnableAt();
        if (o.mRunnableAtInvalidated) o.updateRunnableAt();
        return Long.compare(mRunnableAt, o.mRunnableAt);
    }

    @Override
    public String toString() {
        return "BroadcastProcessQueue{"
                + Integer.toHexString(System.identityHashCode(this))
                + " " + processName + "/" + UserHandle.formatUid(uid) + "}";
    }

    public String toShortString() {
        return processName + "/" + UserHandle.formatUid(uid);
    }

    public void dumpLocked(@NonNull IndentingPrintWriter pw) {
        if ((mActive == null) && mPending.isEmpty()) return;

        pw.println(toShortString());
        pw.increaseIndent();
        if (mActive != null) {
            pw.print("🏃 ");
            pw.print(mActive.toShortString());
            pw.print(' ');
            pw.println(mActive.receivers.get(mActiveIndex));
        }
        for (SomeArgs args : mPending) {
            final BroadcastRecord r = (BroadcastRecord) args.arg1;
            pw.print("\u3000 ");
            pw.print(r.toShortString());
            pw.print(' ');
            pw.println(r.receivers.get(args.argi1));
        }
        pw.decreaseIndent();
    }
}
+649 −0

File added.

Preview size limit exceeded, changes collapsed.

+6 −2
Original line number Diff line number Diff line
@@ -90,7 +90,8 @@ public class BroadcastQueueTest {
    private final Impl mImpl;

    private enum Impl {
        DEFAULT
        DEFAULT,
        MODERN,
    }

    private Context mContext;
@@ -114,7 +115,7 @@ public class BroadcastQueueTest {

    @Parameters(name = "impl={0}")
    public static Collection<Object[]> data() {
        return Arrays.asList(new Object[][] { {Impl.DEFAULT} });
        return Arrays.asList(new Object[][] { {Impl.DEFAULT}, {Impl.MODERN} });
    }

    public BroadcastQueueTest(Impl impl) {
@@ -177,6 +178,9 @@ public class BroadcastQueueTest {
            mQueue = new BroadcastQueueImpl(mAms, mHandlerThread.getThreadHandler(), TAG,
                    constants, emptySkipPolicy, emptyHistory, false,
                    ProcessList.SCHED_GROUP_DEFAULT);
        } else if (mImpl == Impl.MODERN) {
            mQueue = new BroadcastQueueModernImpl(mAms, mHandlerThread.getThreadHandler(),
                    constants, emptySkipPolicy, emptyHistory);
        } else {
            throw new UnsupportedOperationException();
        }