Donate to e Foundation | Murena handsets with /e/OS | Own a part of Murena! Learn more

Commit a3d9bb14 authored by Kevin Jeon's avatar Kevin Jeon
Browse files

Add MessageStack, a Treiber stack of Messages

MessageStack allows for concurrent adding and removing of messages, and
hooks into MessageHeap for ordering of messages. This is intended for
use in the new message queue implementation.

Test: atest android.os.MessageStackTest
Bug: 421623328
Flag: EXEMPT new data structure isn't used yet; usages will be flagged.
Change-Id: I019d427ce9df4ca9dbb761375f9854ed9a595fa7
parent 0bd4ed12
Loading
Loading
Loading
Loading
+8 −4
Original line number Diff line number Diff line
@@ -1543,7 +1543,11 @@ public final class MessageQueue {
        }
    }

    static final class MatchHandlerWhatAndObject extends MessageCompare {
    /**
     * Matches handler, what, and object if non-null.
     * @hide
     */
    public static final class MatchHandlerWhatAndObject extends MessageCompare {
        @Override
        public boolean compareMessage(Message m, Handler h, int what, Object object, Runnable r,
                long when) {
@@ -2593,7 +2597,7 @@ public final class MessageQueue {
                MethodHandles.Lookup l = MethodHandles.lookup();
                sRemovedFromStack = l.findVarHandle(MessageQueue.MessageNode.class,
                        "mRemovedFromStackValue", boolean.class);
            } catch (Exception e) {
            } catch (ReflectiveOperationException e) {
                throw new ExceptionInInitializerError(e);
            }
        }
@@ -2673,7 +2677,7 @@ public final class MessageQueue {
                    long.class);
            sMptrRefCount = l.findVarHandle(MessageQueue.class, "mMptrRefCountValue",
                    long.class);
        } catch (Exception e) {
        } catch (ReflectiveOperationException e) {
            throw new ExceptionInInitializerError(e);
        }
    }
@@ -2790,7 +2794,7 @@ public final class MessageQueue {
                MethodHandles.Lookup l = MethodHandles.lookup();
                sCounts = l.findVarHandle(MessageQueue.MessageCounts.class, "mCountsValue",
                        long.class);
            } catch (Exception e) {
            } catch (ReflectiveOperationException e) {
                throw new ExceptionInInitializerError(e);
            }
        }
+17 −0
Original line number Diff line number Diff line
@@ -312,6 +312,23 @@ public class Handler {
        return new Handler(looper, callback, true);
    }

    /**
     * Create a new Handler with all reference fields explicitly provided. This is intended to be
     * used for creating a Handler sentinel object.
     */
    private Handler(Looper looper, MessageQueue queue, Callback callback) {
        mLooper = looper;
        mQueue = queue;
        mCallback = callback;
        mAsynchronous = false;
        mIsShared = false;
    }

    /** @hide */
    static Handler createSentinelHandler() {
        return new Handler(null, null, null);
    }

    /** @hide */
    @UnsupportedAppUsage(maxTargetSdk = Build.VERSION_CODES.R, trackingBug = 170729553)
    @NonNull
+70 −4
Original line number Diff line number Diff line
@@ -24,6 +24,8 @@ import android.util.proto.ProtoOutputStream;

import com.android.internal.annotations.VisibleForTesting;

import java.lang.invoke.MethodHandles;
import java.lang.invoke.VarHandle;
import java.util.concurrent.atomic.AtomicLong;

/**
@@ -123,11 +125,46 @@ public final class Message implements Parcelable {
    /** If set message is asynchronous */
    /*package*/ static final int FLAG_ASYNCHRONOUS = 1 << 1;

    /** If the message is marked for removal */
    /* package*/ static final int FLAG_REMOVED = 1 << 2;

    /** Flags to clear in the copyFrom method */
    /*package*/ static final int FLAGS_TO_CLEAR_ON_COPY_FROM = FLAG_IN_USE;
    /*package*/ static final int FLAGS_TO_CLEAR_ON_COPY_FROM = FLAG_IN_USE | FLAG_REMOVED;

    @UnsupportedAppUsage
    /*package*/ int flags;
    /*package*/ volatile int flags;
    /*package*/ static final VarHandle sFlags;
    static {
        try {
            MethodHandles.Lookup l = MethodHandles.lookup();
            sFlags = l.findVarHandle(Message.class, "flags", int.class);
        } catch (ReflectiveOperationException e) {
            throw new ExceptionInInitializerError(e);
        }
    }

    /**
     * CAS flags with FLAG_REMOVED to indicate that this message should be removed. Returns false if
     * the message has already been marked for removal.
     *
     * The CAS loop shouldn't fail due to FLAG_IN_USE or FLAG_ASYNCHRONOUS being set because those
     * are only changed when a message is initially created and enqueued. However, we loop anyways
     * in case additional flags that can be modified are added in the future.
     */
    boolean markRemoved() {
        int localFlags;
        do {
            localFlags = this.flags;
            if ((localFlags & FLAG_REMOVED) != 0) {
                return false;
            }
        } while (!sFlags.compareAndSet(this, localFlags, localFlags | FLAG_REMOVED));
        return true;
    }

    boolean isRemoved() {
       return (this.flags & FLAG_REMOVED) != 0;
    }

    /**
     * The targeted delivery time of this message. The time-base is
@@ -156,6 +193,10 @@ public final class Message implements Parcelable {
    @UnsupportedAppUsage
    /*package*/ Message next;

    // only used in MessageStack
    /*package*/ Message prev;
    /*package*/ Message nextFree;

    /**
     * For trace flows, if tracing is enabled.
     */
@@ -367,6 +408,27 @@ public final class Message implements Parcelable {
        data = null;
    }

    // Sentinel values used to clear reference fields with a valid 'null' value, to avoid grabbing a
    // removed message when matching for 'null' in these fields.
    private static final Object NULL_OBJECT = new Object();
    private static final Handler NULL_HANDLER = Handler.createSentinelHandler();
    private static final Runnable NULL_RUNNABLE = () -> {};

    /**
     * Clear reference fields to avoid retaining any objects. This is used in MessageStack's message
     * removal functions, and differs from clear() in that 'flags' and links (next, prev, nextFree)
     * are not cleared, as they are still needed to indicate that a message is removed and to
     * traverse the stack.
     */
    void clearReferenceFields() {
        obj = NULL_OBJECT;
        replyTo = null;
        sendingThreadName = null;
        data = null;
        target = NULL_HANDLER;
        callback = NULL_RUNNABLE;
    }

    /**
     * Make this message like o.  Performs a shallow copy of the data field.
     * Does not copy the linked list fields, nor the timestamp or
@@ -409,7 +471,9 @@ public final class Message implements Parcelable {
     * worry about yours conflicting with other handlers.
     */
    public Handler getTarget() {
        return target;
        // We assign this first to avoid a data race that could potentially expose NULL_HANDLER.
        final Handler ret = target;
        return ret == NULL_HANDLER ? null : ret;
    }

    /**
@@ -421,7 +485,9 @@ public final class Message implements Parcelable {
     * {@link Handler#handleMessage(Message)}.
     */
    public Runnable getCallback() {
        return callback;
        // We assign this first to avoid a data race that could potentially expose NULL_RUNNABLE.
        final Runnable ret = callback;
        return ret == NULL_RUNNABLE ? null : ret;
    }

    /** @hide */
+12 −0
Original line number Diff line number Diff line
@@ -94,6 +94,8 @@ public final class MessageHeap {
        Message tmp = mHeap[x];
        mHeap[x] = mHeap[y];
        mHeap[y] = tmp;
        mHeap[x].heapIndex = x;
        mHeap[y].heapIndex = y;
    }

    private void siftDown(int i) {
@@ -199,6 +201,7 @@ public final class MessageHeap {
            mNumElements--;

            mHeap[0] = mHeap[mNumElements];
            mHeap[0].heapIndex = 0;
            mHeap[mNumElements] = null;

            siftDown(0);
@@ -223,6 +226,7 @@ public final class MessageHeap {
        } else {
            mNumElements--;
            mHeap[i] = mHeap[mNumElements];
            mHeap[i].heapIndex = i;
            mHeap[mNumElements] = null;
            if (!siftUp(i)) {
                siftDown(i);
@@ -232,7 +236,10 @@ public final class MessageHeap {
    }

    public void removeMessage(@NonNull Message m) throws IllegalArgumentException {
        // We set this index to be out of range so that we don't attempt to remove this message from
        // the heap a second time (e.g. when it's processed on the MessageStack freelist).
        remove(m.heapIndex);
        m.heapIndex = -1;
    }

    public void removeAll() {
@@ -288,6 +295,11 @@ public final class MessageHeap {
        int localHeapCount = 0;
        for (int i = 0; i < mHeap.length; i++) {
            if (mHeap[i] != null) {
                if (mHeap[i].heapIndex != i) {
                    Log.e(TAG, "Verify failure: message at " + i + " has heapIndex "
                            + mHeap[i].heapIndex);
                    return false;
                }
                localHeapCount++;
            }
        }
+283 −0
Original line number Diff line number Diff line
/*
 * Copyright (C) 2025 The Android Open Source Project
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *      http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package android.os;

import android.annotation.Nullable;
import java.lang.invoke.MethodHandles;
import java.lang.invoke.VarHandle;

/**
 * Treiber stack of Message objects, used in NewNewMessageQueue.
 * @hide
 */
public final class MessageStack {
    private static final String TAG = "MessageStack";

    private static final VarHandle sTop;
    private volatile Message mTopValue = null;

    private static final VarHandle sFreelistHead;
    private volatile Message mFreelistHeadValue = null;

    // The underlying min-heaps that are used for ordering Messages.
    private final MessageHeap mSyncHeap = new MessageHeap();
    private final MessageHeap mAsyncHeap = new MessageHeap();

    // This points to the most-recently processed message. Comparison with mTopValue will indicate
    // whether some messages still need to be processed.
    private Message mLooperProcessed = null;

    static {
        try {
            MethodHandles.Lookup l = MethodHandles.lookup();
            sTop = l.findVarHandle(MessageStack.class, "mTopValue",
                    Message.class);
            sFreelistHead = l.findVarHandle(MessageStack.class, "mFreelistHeadValue",
                    Message.class);
        } catch (ReflectiveOperationException e) {
            throw new ExceptionInInitializerError(e);
        }
    }

    /**
     * Pushes a message onto the top of the stack with a CAS.
     */
    public void pushMessage(Message m) {
        // TODO: This should fail if the current top value is the shutdown sentinel.
        Message current;
        do {
            current = mTopValue;
            m.next = current;
        } while (!sTop.weakCompareAndSetRelease(this, current, m));

    }

    /**
     * Iterates through messages and creates a reverse-ordered chain of messages to remove.
     */
    public void updateFreelist(MessageQueue.MessageCompare compare, Handler h, int what,
            Object object, Runnable r, long when) {
        Message current = (Message) sTop.getAcquire(this);
        Message prev = null;
        Message firstRemoved = null;

        while (current != null) {
            // Check that the message hasn't already been removed or processed elsewhere.
            if (!current.isRemoved()
                    && compare.compareMessage(current, h, what, object, r, when)
                    && current.markRemoved()) {
                if (firstRemoved == null) {
                    firstRemoved = current;
                }
                current.clearReferenceFields();
                // nextFree links each to-be-removed message to the one processed before.
                current.nextFree = prev;
                prev = current;
            }
            current = current.next;
        }

        Message freelist;
        do {
            freelist = mFreelistHeadValue;
            firstRemoved.nextFree = freelist;
        // prev points to the last to-be-removed message that was processed.
        } while (!sFreelistHead.compareAndSet(this, freelist, prev));
    }

    /**
     * Adds not-yet-processed messages into the MessageHeap and creates backlinks.
     */
    public void heapSweep() {
        Message current = (Message) sTop.getAcquire(this);
        Message prevLooperProcessed = mLooperProcessed;
        mLooperProcessed = current;

        while (current != null && current != prevLooperProcessed) {
            if (current.next != null) {
                current.next.prev = current;
            }
            // MessageHeap will maintain its own ordering of Messages, so it doesn't matter that we
            // insert these Messages in a different order than submitted to the stack.
            // TODO: Removed messages shouldn't be added to the heap, and possibly not into the
            // stack either.
            if (current.isAsynchronous()) {
                mAsyncHeap.add(current);
            } else {
                mSyncHeap.add(current);
            }
            current = current.next;
        }

        // TODO: Investigate inserting in-submitted-order with a second traversal using backlinks.
    }

    /**
     * Iterate through the freelist and unlink Messages.
     */
    public void drainFreelist() {
        Message current = (Message) sFreelistHead.getAndSetAcquire(this, null);
        while (current != null) {
            Message nextFree = current.nextFree;
            current.nextFree = null;
            removeMessage(current, /* removeFromHeap= */ true);
            current = nextFree;
        }
    }

    /**
     * Get a message from the MessageHeap, remove its links within this stack, then return it.
     *
     * This will return null if there are no more items in the heap, or if there was a race and the
     * polled message was removed.
     */
    public Message pop(boolean async) {
        final Message m = async ? mAsyncHeap.poll() : mSyncHeap.poll();
        if (m != null) {
            // We CAS this so that a remover doesn't attempt to add it to the freelist. If this CAS
            // fails, it has already been removed, and links will be cleared in a drainFreelist()
            // pass.
            if (!m.markRemoved()) {
                return null;
            }
            removeMessage(m, /* removeFromHeap= */ false);
        }
        return m;
    }

    /**
     * Remove a message from the stack.
     *
     * removeFromHeap indicates if the message should be removed from the heap (if this message is
     * being drained from the freelist) or not (if this message was retrieved using
     * MessageHeap.pop()).
     */
    private void removeMessage(Message m, boolean removeFromHeap) {
        // An out of range heapIndex means that we've already removed this message from the heap
        // during the MessageHeap.peek() loop in peek().
        if (removeFromHeap && m.heapIndex >= 0) {
            if (m.isAsynchronous()) {
                mAsyncHeap.removeMessage(m);
            } else {
                mSyncHeap.removeMessage(m);
            }
        }

        // mLooperProcessed must be updated to the next message that hasn't been removed.
        if (m == mLooperProcessed) {
            do {
                mLooperProcessed = mLooperProcessed.next;
            } while (mLooperProcessed != null && mLooperProcessed.isRemoved());
        }
        // If this is the top, attempt to CAS the top to the next item.
        if (m == mTopValue) {
            // Since only the looper thread can pop or drain the freelist, if this CAS fails, it
            // can only be due to a push or quit.
            if (sTop.compareAndSet(this, m, m.next)) {
                unlinkFromNext(m);
                m.prev = null;
                return;
            }
            // If the CAS failed, this is no longer the top, and we must find m's predecessor and
            // create backlinks before continuing to remove the message the normal way.
            heapSweep();
        }
        unlinkFromNext(m);
        unlinkFromPrev(m);
        m.prev = null;
    }

    private static void unlinkFromNext(Message m) {
        if (m.next != null) {
            m.next.prev = m.prev;
        }
    }

    private static void unlinkFromPrev(Message m) {
        if (m.prev != null) {
            m.prev.next = m.next;
        }
    }

    /**
     * Return the next non-removed Message.
     *
     * A null return value indicates that the underlying heap was either empty or only contained
     * removed messages.
     */
    public @Nullable Message peek(boolean async) {
        while (true) {
            final Message m = async ? mAsyncHeap.peek() : mSyncHeap.peek();
            if (m == null) {
                return null;
            }
            if (!m.isRemoved()) {
                return m;
            }
            if (async) {
                mAsyncHeap.removeMessage(m);
            } else {
                mSyncHeap.removeMessage(m);
            }
        }
    }

    /**
     * Remove the input Message.
     *
     * This is suitable to use with the output of peek().
     */
    public void remove(Message m) {
        removeMessage(m, /* removeFromHeap= */ true);
    }

    /**
     * Returns the number of non-removed messages in this stack.
     */
    public int sizeForTest() {
        int size = 0;
        Message current = (Message) sTop.getAcquire(this);
        while (current != null) {
            if (!current.isRemoved()) {
                size++;
            }
            current = current.next;
        }
        return size;
    }

    /**
     * Returns the number of messages in the freelist.
     */
    public int freelistSizeForTest() {
        int size = 0;
        Message current = (Message) sFreelistHead.getAcquire(this);
        while (current != null) {
            size++;
            current = current.nextFree;
        }
        return size;
    }

    /**
     * Returns the number of messages in the underlying MessageHeaps.
     */
    public int combinedHeapSizesForTest() {
        return mSyncHeap.size() + mAsyncHeap.size();
    }

}
Loading