Donate to e Foundation | Murena handsets with /e/OS | Own a part of Murena! Learn more

Commit 9cf7f2b5 authored by Mark Fasheh's avatar Mark Fasheh
Browse files

Use a stack node to indicate that MessageQueue is quitting

This makes it easier to reason about our quitting logic.  In particular,
it's very easy to determine that enqueue won't race with teardown.

We further simplify things by having next() remove messages during
teardown for us.

Test: atest MessageQueueTest
Test: atest LooperTest
Test: atest HandlerTest
Test: atest HandlerThreadTest
Bug: 411488376
Bug: 413185945
Flag: build.RELEASE_PACKAGE_MESSAGEQUEUE_IMPLEMENTATION
Change-Id: I3aac55ad987ff94a71b5f07e81070eb77809abe7
parent d12598e5
Loading
Loading
Loading
Loading
+171 −75
Original line number Diff line number Diff line
@@ -401,7 +401,7 @@ public final class MessageQueue {

    private boolean isPollingConcurrent() {
        // If the loop is quitting then it must not be idling.
        if (incrementMptrRefs()) {
        if (!getQuitting() && incrementMptrRefs()) {
            try {
                return nativeIsPolling(mPtr);
            } finally {
@@ -655,9 +655,10 @@ public final class MessageQueue {

    private static final AtomicLong mMessagesDelivered = new AtomicLong();

    /* This is only read/written from the Looper thread. For use with Concurrent MQ */
    /* These are only read/written from the Looper thread. For use with Concurrent MQ */
    private int mNextPollTimeoutMillis;
    private boolean mMessageDirectlyQueued;
    private boolean mWorkerShouldQuit;
    private Message nextMessage(boolean peek, boolean returnEarliest) {
        int i = 0;

@@ -668,20 +669,50 @@ public final class MessageQueue {
            }

            mDrainingLock.lock();
            try {
                mNextIsDrainingStack = true;
            } finally {
                mDrainingLock.unlock();
            }

            StackNode oldTop;
            QuittingNode quittingNode = null;
            /*
             * Set our state to active, drain any items from the stack into our priority queues
             * Set our state to active, drain any items from the stack into our priority queues.
             * If we are quitting we won't swap away the stack as we want to retain the quitting
             * node for enqueue and remove to see.
             */
            StackNode oldTop;
            oldTop = swapAndSetStackStateActive();
            boolean shouldRemoveMessages = false;
            if (oldTop.isQuittingNode()) {
                quittingNode = (QuittingNode) oldTop;
                if (!mWorkerShouldQuit) {
                    mWorkerShouldQuit = true;
                    /*
                     * Only remove messages from the queue the first time we encounter a quitting
                     * node, to avoid O(n^2) runtime if we quit safely and there's a lot of nodes
                     * in the queue.
                     */
                    shouldRemoveMessages = true;
                }
            }
            drainStack(oldTop);

            mDrainingLock.lock();
            try {
                mNextIsDrainingStack = false;
                mDrainCompleted.signalAll();
            } finally {
                mDrainingLock.unlock();
            }

            if (shouldRemoveMessages) {
                if (quittingNode.mRemoveAll) {
                    removeAllMessages();
                } else {
                    removeAllFutureMessages(quittingNode.mTS);
                }
            }

            /*
             * The objective of this next block of code is to:
@@ -786,7 +817,8 @@ public final class MessageQueue {
             */
            StateNode nextOp = sStackStateActive;
            if (found == null) {
                if (getQuitting()) {
                if (mWorkerShouldQuit) {
                    // Set to zero so we can keep looping and finding messages until we're done.
                    mNextPollTimeoutMillis = 0;
                } else if (next == null) {
                    /* No message to deliver, sleep indefinitely */
@@ -819,7 +851,7 @@ public final class MessageQueue {
             * Try to swap our state from Active back to Park or TimedPark. If we raced with
             * enqueue, loop back around to pick up any new items.
             */
            if (sState.compareAndSet(this, sStackStateActive, nextOp)) {
            if (mWorkerShouldQuit || sState.compareAndSet(this, sStackStateActive, nextOp)) {
                mMessageCounts.clearCounts();
                if (found != null) {
                    if (!peek && !removeFromPriorityQueue(found)) {
@@ -864,7 +896,8 @@ public final class MessageQueue {
            }

            // Prevent any race between quit()/nativeWake() and dispose()
            if (checkQuittingAndWaitForRefsToDrop()) {
            if (mWorkerShouldQuit) {
                setMptrTeardownAndWaitForRefsToDrop();
                dispose();
                return null;
            }
@@ -1055,20 +1088,30 @@ public final class MessageQueue {
        }

        if (mUseConcurrent) {
            if (incrementMptrRefsAndSetQuitting()) {
                try {
                    if (safe) {
                        removeAllFutureMessages();
            QuittingNode quittingNode = new QuittingNode(safe);
            while (true) {
                StackNode old = (StackNode) sState.getVolatile(this);
                if (old.isQuittingNode()) {
                    return;
                }
                quittingNode.mNext = old;
                if (old.isMessageNode()) {
                    quittingNode.mBottomOfStack = ((MessageNode) old).mBottomOfStack;
                } else {
                        removeAllMessages();
                    quittingNode.mBottomOfStack = (StateNode) old;
                }

                    // We can assume mPtr != 0 while we hold a ref on our quitting state
                if (sState.compareAndSet(this, old, quittingNode)) {
                    if (incrementMptrRefs()) {
                        try {
                            nativeWake(mPtr);
                        } finally {
                            decrementMptrRefs();
                        }
                    }
                    return;
                }
            }
        } else {
            synchronized (this) {
                if (mQuitting) {
@@ -2125,9 +2168,9 @@ public final class MessageQueue {
            return true;
        }
    }
    private final MatchAllMessages mMatchAllMessages = new MatchAllMessages();
    private static final MatchAllMessages sMatchAllMessages = new MatchAllMessages();
    private void removeAllMessages() {
        findOrRemoveMessages(null, -1, null, null, 0, mMatchAllMessages, true);
        findOrRemoveMessages(null, -1, null, null, 0, sMatchAllMessages, true);
    }

    private static final class MatchAllFutureMessages extends MessageCompare {
@@ -2135,16 +2178,15 @@ public final class MessageQueue {
        public boolean compareMessage(MessageNode n, Handler h, int what, Object object, Runnable r,
                long when) {
            final Message m = n.mMessage;
                    if (m.when > when) {
                return true;
            }
            return false;

            return n.mMessage.when > when;
        }
    }
    private final MatchAllFutureMessages mMatchAllFutureMessages = new MatchAllFutureMessages();
    private void removeAllFutureMessages() {
        findOrRemoveMessages(null, -1, null, null, SystemClock.uptimeMillis(),
                mMatchAllFutureMessages, true);
    private static final MatchAllFutureMessages sMatchAllFutureMessages =
            new MatchAllFutureMessages();
    private void removeAllFutureMessages(long now) {
        findOrRemoveMessages(null, -1, null, null, now,
                sMatchAllFutureMessages, true);
    }

    @NeverCompile
@@ -2416,6 +2458,18 @@ public final class MessageQueue {

    /* Move any non-cancelled messages into the priority queue */
    private void drainStack(StackNode oldTop) {
        QuittingNode quittingNode = oldTop.isQuittingNode() ? (QuittingNode) oldTop : null;
        if (quittingNode != null) {
            oldTop = quittingNode.mNext;
            /*
             * The stack is still visible so we must be careful.
             * Enqueue will only ever see the quitting node so we don't have to worry about races
             * there.
             * Remove may walk the stack but it should be fine to either see the
             * new stack or the old one.
             */
            quittingNode.mNext = quittingNode.mBottomOfStack;
        }
        while (oldTop.isMessageNode()) {
            MessageNode oldTopMessageNode = (MessageNode) oldTop;
            if (oldTopMessageNode.removeFromStack()) {
@@ -2433,12 +2487,16 @@ public final class MessageQueue {
        }
    }

    /* Set the stack state to Active, return a list of nodes to walk. */
    /**
     *  Set the stack state to Active, return a list of nodes to walk.
     *  If we are already active or quitting simply return the list without swapping.
     *  In the quitting case this will leave the stack state to whatever value it previously had.
     */
    private StackNode swapAndSetStackStateActive() {
        while (true) {
            /* Set stack state to Active, get node list to walk later */
            StackNode current = (StackNode) sState.getVolatile(this);
            if (current == sStackStateActive
            if (current == sStackStateActive || current.isQuittingNode()
                    || sState.compareAndSet(this, current, sStackStateActive)) {
                return current;
            }
@@ -2448,6 +2506,9 @@ public final class MessageQueue {
        if (node.isMessageNode()) {
            return ((MessageNode) node).mBottomOfStack;
        }
        if (node.isQuittingNode()) {
            return ((QuittingNode) node).mBottomOfStack;
        }
        return (StateNode) node;
    }

@@ -2486,6 +2547,13 @@ public final class MessageQueue {
     * deadline
     */
    private static final int STACK_NODE_TIMEDPARK = 3;
    /*
     * Tells us that the looper is quitting. Quit() will place this on top of the stack and
     * wake our looper thread. Once a quitting node is on top of the stack, it stays there. If
     * enqueue sees this node it will refuse to queue up new messages. Remove knows to skip a
     * quitting node.
     */
    private static final int STACK_NODE_QUITTING = 4;

    /* Describes a node in the Treiber stack */
    static class StackNode {
@@ -2504,6 +2572,28 @@ public final class MessageQueue {
        final boolean isMessageNode() {
            return mType == STACK_NODE_MESSAGE;
        }

        final boolean isQuittingNode() {
            return mType == STACK_NODE_QUITTING;
        }
    }

    static final class QuittingNode extends StackNode {
        volatile StackNode mNext;
        StateNode mBottomOfStack;
        final boolean mRemoveAll;
        final long mTS;

        QuittingNode(boolean safe) {
            super(STACK_NODE_QUITTING);
            if (safe) {
                mTS = SystemClock.uptimeMillis();
                mRemoveAll = false;
            } else {
                mTS = 0;
                mRemoveAll = true;
            }
        }
    }

    static final class MessageNode extends StackNode implements Comparable<MessageNode> {
@@ -2602,14 +2692,14 @@ public final class MessageQueue {
    private volatile long mNextFrontInsertSeqValue = -1;

    /*
     * Combine our quitting state with a ref count of access to mPtr.
     * next() doesn't want to dispose of mPtr until quit() is done processing messages.
     * Ref count our access to mPtr.
     * next() doesn't want to dispose of mPtr until after quit() is called.
     * isPolling() also needs to ensure safe access to mPtr.
     * So keep a ref count of access to mPtr. If quitting is set, we disallow new refs.
     * next() will only proceed with disposing of the pointer once all refs are dropped.
     */
    private static VarHandle sQuittingRefCount;
    private volatile long mQuittingRefCountValue = 0;
    private static VarHandle sMptrRefCount;
    private volatile long mMptrRefCountValue = 0;

    static {
        try {
@@ -2620,7 +2710,7 @@ public final class MessageQueue {
                    long.class);
            sNextFrontInsertSeq = l.findVarHandle(MessageQueue.class, "mNextFrontInsertSeqValue",
                    long.class);
            sQuittingRefCount = l.findVarHandle(MessageQueue.class, "mQuittingRefCountValue",
            sMptrRefCount = l.findVarHandle(MessageQueue.class, "mMptrRefCountValue",
                    long.class);
        } catch (Exception e) {
            Log.wtf(TAG_C, "VarHandle lookup failed with exception: " + e);
@@ -2628,8 +2718,8 @@ public final class MessageQueue {
        }
    }

    // Use MSB to indicate quitting state. Lower 63 bits hold ref count.
    private static final long QUITTING_MASK = 1L << 63;
    // Use MSB to indicate mPtr teardown state. Lower 63 bits hold ref count.
    private static final long MPTR_TEARDOWN_MASK = 1L << 63;

    /**
     * Increment the mPtr ref count.
@@ -2642,12 +2732,12 @@ public final class MessageQueue {
     */
    private boolean incrementMptrRefs() {
        while (true) {
            final long oldVal = mQuittingRefCountValue;
            if ((oldVal & QUITTING_MASK) != 0) {
            final long oldVal = mMptrRefCountValue;
            if ((oldVal & MPTR_TEARDOWN_MASK) != 0) {
                // If we're quitting then we're not allowed to increment the ref count.
                return false;
            }
            if (sQuittingRefCount.compareAndSet(this, oldVal, oldVal + 1)) {
            if (sMptrRefCount.compareAndSet(this, oldVal, oldVal + 1)) {
                // Successfully incremented the ref count without quitting.
                return true;
            }
@@ -2660,27 +2750,13 @@ public final class MessageQueue {
     * Call after {@link #incrementMptrRefs()} to release the ref on mPtr.
     */
    private void decrementMptrRefs() {
        long oldVal = (long)sQuittingRefCount.getAndAdd(this, -1);
        long oldVal = (long) sMptrRefCount.getAndAdd(this, -1);
        // If quitting and we were the last ref, wake up looper thread
        if (oldVal - 1 == QUITTING_MASK) {
        if (oldVal - 1 == MPTR_TEARDOWN_MASK) {
            LockSupport.unpark(mLooperThread);
        }
    }

    private boolean incrementMptrRefsAndSetQuitting() {
        while (true) {
            final long oldVal = mQuittingRefCountValue;
            if ((oldVal & QUITTING_MASK) != 0) {
                // If we're quitting then we're not allowed to increment the ref count.
                return false;
            }
            if (sQuittingRefCount.compareAndSet(this, oldVal, (oldVal + 1) | QUITTING_MASK)) {
                // Successfully incremented the ref count and set quitting.
                return true;
            }
        }
    }

    /**
     * Wake the looper thread.
     *
@@ -2712,17 +2788,22 @@ public final class MessageQueue {
    }

    private boolean getQuitting() {
        return (mQuittingRefCountValue & QUITTING_MASK) != 0;
        return ((StackNode) sState.getVolatile(this)).isQuittingNode();
    }

    // Must only be called from looper thread
    private boolean checkQuittingAndWaitForRefsToDrop() {
        if (!getQuitting()) {
            return false;
    private void setMptrTeardownAndWaitForRefsToDrop() {
        while (true) {
            final long oldVal = mMptrRefCountValue;
            if (sMptrRefCount.compareAndSet(this, oldVal, oldVal | MPTR_TEARDOWN_MASK)) {
                // Successfully set teardown state.
                break;
            }
        }

        boolean wasInterrupted = false;
        try {
            while ((mQuittingRefCountValue & ~QUITTING_MASK) != 0) {
            while ((mMptrRefCountValue & ~MPTR_TEARDOWN_MASK) != 0) {
                LockSupport.park();
                wasInterrupted |= Thread.interrupted();
            }
@@ -2731,7 +2812,6 @@ public final class MessageQueue {
                mLooperThread.interrupt();
            }
        }
        return true;
    }

    /*
@@ -2837,14 +2917,6 @@ public final class MessageQueue {
    private final Condition mDrainCompleted = mDrainingLock.newCondition();

    private boolean enqueueMessageUnchecked(@NonNull Message msg, long when) {
        if (getQuitting()) {
            IllegalStateException e = new IllegalStateException(
                    msg.target + " sending message to a Handler on a dead thread");
            Log.w(TAG_C, e.getMessage(), e);
            msg.recycleUnchecked();
            return false;
        }

        long seq = when != 0 ? ((long) sNextInsertSeq.getAndAdd(this, 1L) + 1L)
                : ((long) sNextFrontInsertSeq.getAndAdd(this, -1L) - 1L);
        /* TODO: Add a MessageNode member to Message so we can avoid this allocation */
@@ -2862,6 +2934,14 @@ public final class MessageQueue {
        final Looper myLooper = Looper.myLooper();
        /* If we are running on the looper thread we can add directly to the priority queue */
        if (myLooper != null && myLooper.getQueue() == this) {
            if (getQuitting()) {
                IllegalStateException e = new IllegalStateException(
                        msg.target + " sending message to a Handler on a dead thread");
                Log.w(TAG_C, e.getMessage(), e);
                msg.recycleUnchecked();
                return false;
            }

            node.removeFromStack();
            insertIntoPriorityQueue(node);
            /*
@@ -2907,6 +2987,13 @@ public final class MessageQueue {
                    node.mWokeUp = wakeNeeded;
                    break;

                case STACK_NODE_QUITTING:
                    IllegalStateException e = new IllegalStateException(
                            msg.target + " sending message to a Handler on a dead thread");
                    Log.w(TAG_C, e.getMessage(), e);
                    msg.recycleUnchecked();
                    return false;

                default:
                    MessageNode oldMessage = (MessageNode) old;

@@ -2955,6 +3042,15 @@ public final class MessageQueue {
            return false;
        }

        if (top.isQuittingNode()) {
            QuittingNode quittingNode = (QuittingNode) top;
            if (quittingNode.mNext.isMessageNode()) {
                top = quittingNode.mNext;
            } else {
                waitForDrainCompleted();
                return false;
            }
        }
        /*
         * We have messages that we may tombstone. Walk the stack until we hit the bottom or we
         * hit a null pointer.