Donate to e Foundation | Murena handsets with /e/OS | Own a part of Murena! Learn more

Commit 0f85ce38 authored by Jeff Brown's avatar Jeff Brown
Browse files

Improve MessageQueue sync barrier implementation.

Instead of acquiring and releasing a barrier using an up/down
counter, we post a message to the queue that represents the
barrier.  This is a more natural representation of the barrier
and better matches what we want to do with it: stall messages
behind the barrier in the queue while allowing messages earlier
in the queue to run as usual.

Refactored the MessageQueue a little bit to simplify the quit
logic and to better encapsulate the invariant that all
messages within the queue must have a valid target.  Messages
without targets are used to represent barriers.

Bug: 5721047
Change-Id: Id297d9995474b5e3f17d24e302c58168e0a00394
parent a175a5b7
Loading
Loading
Loading
Loading
+4 −4
Original line number Diff line number Diff line
@@ -513,7 +513,7 @@ public class Handler {
     * message queue.
     */
    public final void removeMessages(int what) {
        mQueue.removeMessages(this, what, null, true);
        mQueue.removeMessages(this, what, null);
    }

    /**
@@ -522,7 +522,7 @@ public class Handler {
     * all messages will be removed.
     */
    public final void removeMessages(int what, Object object) {
        mQueue.removeMessages(this, what, object, true);
        mQueue.removeMessages(this, what, object);
    }

    /**
@@ -539,7 +539,7 @@ public class Handler {
     * the message queue.
     */
    public final boolean hasMessages(int what) {
        return mQueue.removeMessages(this, what, null, false);
        return mQueue.hasMessages(this, what, null);
    }

    /**
@@ -547,7 +547,7 @@ public class Handler {
     * whose obj is 'object' in the message queue.
     */
    public final boolean hasMessages(int what, Object object) {
        return mQueue.removeMessages(this, what, object, false);
        return mQueue.hasMessages(this, what, object);
    }

    // if we can get rid of this method, the handler need not remember its loop
+107 −60
Original line number Diff line number Diff line
@@ -55,13 +55,13 @@ public class Looper {

    // sThreadLocal.get() will return null unless you've called prepare().
    static final ThreadLocal<Looper> sThreadLocal = new ThreadLocal<Looper>();
    private static Looper sMainLooper;  // guarded by Looper.class

    final MessageQueue mQueue;
    final Thread mThread;
    volatile boolean mRun;

    private Printer mLogging = null;
    private static Looper mMainLooper = null;  // guarded by Looper.class
    private Printer mLogging;

     /** Initialize the current thread as a looper.
      * This gives you a chance to create handlers that then reference
@@ -70,10 +70,14 @@ public class Looper {
      * {@link #quit()}.
      */
    public static void prepare() {
        prepare(true);
    }

    private static void prepare(boolean quitAllowed) {
        if (sThreadLocal.get() != null) {
            throw new RuntimeException("Only one Looper may be created per thread");
        }
        sThreadLocal.set(new Looper());
        sThreadLocal.set(new Looper(quitAllowed));
    }

    /**
@@ -83,19 +87,21 @@ public class Looper {
     * to call this function yourself.  See also: {@link #prepare()}
     */
    public static void prepareMainLooper() {
        prepare();
        setMainLooper(myLooper());
        myLooper().mQueue.mQuitAllowed = false;
        prepare(false);
        synchronized (Looper.class) {
            if (sMainLooper != null) {
                throw new IllegalStateException("The main Looper has already been prepared.");
            }
            sMainLooper = myLooper();
        }

    private synchronized static void setMainLooper(Looper looper) {
        mMainLooper = looper;
    }

    /** Returns the application's main looper, which lives in the main thread of the application.
     */
    public synchronized static Looper getMainLooper() {
        return mMainLooper;
    public static Looper getMainLooper() {
        synchronized (Looper.class) {
            return sMainLooper;
        }
    }

    /**
@@ -103,22 +109,21 @@ public class Looper {
     * {@link #quit()} to end the loop.
     */
    public static void loop() {
        Looper me = myLooper();
        final Looper me = myLooper();
        if (me == null) {
            throw new RuntimeException("No Looper; Looper.prepare() wasn't called on this thread.");
        }
        MessageQueue queue = me.mQueue;
        final MessageQueue queue = me.mQueue;

        // Make sure the identity of this thread is that of the local process,
        // and keep track of what that identity token actually is.
        Binder.clearCallingIdentity();
        final long ident = Binder.clearCallingIdentity();

        while (true) {
        for (;;) {
            Message msg = queue.next(); // might block
            if (msg != null) {
                if (msg.target == null) {
                    // No target is a magic identifier for the quit message.
            if (msg == null) {
                // No message indicates that the message queue is quitting.
                return;
            }

@@ -161,7 +166,6 @@ public class Looper {
            msg.recycle();
        }
    }
    }

    /**
     * Return the Looper object associated with the current thread.  Returns
@@ -193,18 +197,61 @@ public class Looper {
        return myLooper().mQueue;
    }

    private Looper() {
        mQueue = new MessageQueue();
    private Looper(boolean quitAllowed) {
        mQueue = new MessageQueue(quitAllowed);
        mRun = true;
        mThread = Thread.currentThread();
    }

    /**
     * Quits the looper.
     *
     * Causes the {@link #loop} method to terminate as soon as possible.
     */
    public void quit() {
        Message msg = Message.obtain();
        // NOTE: By enqueueing directly into the message queue, the
        // message is left with a null target.  This is how we know it is
        // a quit message.
        mQueue.enqueueMessage(msg, 0);
        mQueue.quit();
    }

    /**
     * Posts a synchronization barrier to the Looper's message queue.
     *
     * Message processing occurs as usual until the message queue encounters the
     * synchronization barrier that has been posted.  When the barrier is encountered,
     * later synchronous messages in the queue are stalled (prevented from being executed)
     * until the barrier is released by calling {@link #removeSyncBarrier} and specifying
     * the token that identifies the synchronization barrier.
     *
     * This method is used to immediately postpone execution of all subsequently posted
     * synchronous messages until a condition is met that releases the barrier.
     * Asynchronous messages (see {@link Message#isAsynchronous} are exempt from the barrier
     * and continue to be processed as usual.
     *
     * This call must be always matched by a call to {@link #removeSyncBarrier} with
     * the same token to ensure that the message queue resumes normal operation.
     * Otherwise the application will probably hang!
     *
     * @return A token that uniquely identifies the barrier.  This token must be
     * passed to {@link #removeSyncBarrier} to release the barrier.
     *
     * @hide
     */
    public final int postSyncBarrier() {
        return mQueue.enqueueSyncBarrier(SystemClock.uptimeMillis());
    }


    /**
     * Removes a synchronization barrier.
     *
     * @param token The synchronization barrier token that was returned by
     * {@link #postSyncBarrier}.
     *
     * @throws IllegalStateException if the barrier was not found.
     *
     * @hide
     */
    public final void removeSyncBarrier(int token) {
        mQueue.removeSyncBarrier(token);
    }

    /**
+149 −115
Original line number Diff line number Diff line
@@ -30,20 +30,23 @@ import java.util.ArrayList;
 * {@link Looper#myQueue() Looper.myQueue()}.
 */
public class MessageQueue {
    // True if the message queue can be quit.
    private final boolean mQuitAllowed;

    @SuppressWarnings("unused")
    private int mPtr; // used by native code

    Message mMessages;
    private final ArrayList<IdleHandler> mIdleHandlers = new ArrayList<IdleHandler>();
    private IdleHandler[] mPendingIdleHandlers;
    private boolean mQuiting;
    boolean mQuitAllowed = true;

    // Indicates whether next() is blocked waiting in pollOnce() with a non-zero timeout.
    private boolean mBlocked;

    // Indicates the barrier nesting level.
    private int mBarrierNestCount;

    @SuppressWarnings("unused")
    private int mPtr; // used by native code
    // The next barrier token.
    // Barriers are indicated by messages with a null target whose arg1 field carries the token.
    private int mNextBarrierToken;

    private native void nativeInit();
    private native void nativeDestroy();
@@ -97,53 +100,8 @@ public class MessageQueue {
        }
    }

    /**
     * Acquires a synchronization barrier.
     *
     * While a synchronization barrier is active, only asynchronous messages are
     * permitted to execute.  Synchronous messages are retained but are not executed
     * until the synchronization barrier is released.
     *
     * This method is used to immediately postpone execution of all synchronous messages
     * until a condition is met that releases the barrier.  Asynchronous messages are
     * exempt from the barrier and continue to be executed as usual.
     *
     * This call nests and must be matched by an equal number of calls to
     * {@link #releaseSyncBarrier}.
     *
     * @hide
     */
    public final void acquireSyncBarrier() {
        synchronized (this) {
            mBarrierNestCount += 1;
        }
    }

    /**
     * Releases a synchronization barrier.
     *
     * This class undoes one invocation of {@link #acquireSyncBarrier}.
     *
     * @throws IllegalStateException if the barrier is not acquired.
     *
     * @hide
     */
    public final void releaseSyncBarrier() {
        synchronized (this) {
            if (mBarrierNestCount == 0) {
                throw new IllegalStateException("The message queue synchronization barrier "
                        + "has not been acquired.");
            }

            mBarrierNestCount -= 1;
            if (!mBlocked || mMessages == null) {
                return;
            }
        }
        nativeWake(mPtr);
    }

    MessageQueue() {
    MessageQueue(boolean quitAllowed) {
        mQuitAllowed = quitAllowed;
        nativeInit();
    }

@@ -167,26 +125,26 @@ public class MessageQueue {
            nativePollOnce(mPtr, nextPollTimeoutMillis);

            synchronized (this) {
                if (mQuiting) {
                    return null;
                }

                // Try to retrieve the next message.  Return if found.
                final long now = SystemClock.uptimeMillis();

                Message prevMsg = null;
                Message msg = mMessages;
                for (;;) {
                    if (msg == null) {
                        // No more messages.
                        nextPollTimeoutMillis = -1;
                        break;
                if (msg != null && msg.target == null) {
                    // Stalled by a barrier.  Find the next asynchronous message in the queue.
                    do {
                        prevMsg = msg;
                        msg = msg.next;
                    } while (msg != null && !msg.isAsynchronous());
                }

                    final long when = msg.when;
                    if (now < when) {
                if (msg != null) {
                    if (now < msg.when) {
                        // Next message is not ready.  Set a timeout to wake up when it is ready.
                        nextPollTimeoutMillis = (int) Math.min(when - now, Integer.MAX_VALUE);
                        break;
                    }

                    if (mBarrierNestCount == 0 || msg.isAsynchronous()) {
                        nextPollTimeoutMillis = (int) Math.min(msg.when - now, Integer.MAX_VALUE);
                    } else {
                        // Got a message.
                        mBlocked = false;
                        if (prevMsg != null) {
@@ -199,16 +157,16 @@ public class MessageQueue {
                        msg.markInUse();
                        return msg;
                    }

                    // We have a message that we could return except that it is
                    // blocked by the sync barrier.  In particular, this means that
                    // we are not idle yet, so we do not want to run the idle handlers.
                    prevMsg = msg;
                    msg = msg.next;
                } else {
                    // No more messages.
                    nextPollTimeoutMillis = -1;
                }

                // If first time idle, then get the number of idlers to run.
                if (pendingIdleHandlerCount < 0 && msg == mMessages) {
                // Idle handles only run if the queue is empty or if the first message
                // in the queue (possibly a barrier) is due to be handled in the future.
                if (pendingIdleHandlerCount < 0
                        && (mMessages == null || now < mMessages.when)) {
                    pendingIdleHandlerCount = mIdleHandlers.size();
                }
                if (pendingIdleHandlerCount <= 0) {
@@ -252,27 +210,94 @@ public class MessageQueue {
        }
    }

    final void quit() {
        if (!mQuitAllowed) {
            throw new RuntimeException("Main thread not allowed to quit.");
        }

        synchronized (this) {
            if (mQuiting) {
                return;
            }
            mQuiting = true;
        }
        nativeWake(mPtr);
    }

    final int enqueueSyncBarrier(long when) {
        // Enqueue a new sync barrier token.
        // We don't need to wake the queue because the purpose of a barrier is to stall it.
        synchronized (this) {
            final int token = mNextBarrierToken++;
            final Message msg = Message.obtain();
            msg.arg1 = token;

            Message prev = null;
            Message p = mMessages;
            if (when != 0) {
                while (p != null && p.when <= when) {
                    prev = p;
                    p = p.next;
                }
            }
            if (prev != null) { // invariant: p == prev.next
                msg.next = p;
                prev.next = msg;
            } else {
                msg.next = p;
                mMessages = msg;
            }
            return token;
        }
    }

    final void removeSyncBarrier(int token) {
        // Remove a sync barrier token from the queue.
        // If the queue is no longer stalled by a barrier then wake it.
        final boolean needWake;
        synchronized (this) {
            Message prev = null;
            Message p = mMessages;
            while (p != null && (p.target != null || p.arg1 != token)) {
                prev = p;
                p = p.next;
            }
            if (p == null) {
                throw new IllegalStateException("The specified message queue synchronization "
                        + " barrier token has not been posted or has already been removed.");
            }
            if (prev != null) {
                prev.next = p.next;
                needWake = false;
            } else {
                mMessages = p.next;
                needWake = mMessages == null || mMessages.target != null;
            }
            p.recycle();
        }
        if (needWake) {
            nativeWake(mPtr);
        }
    }

    final boolean enqueueMessage(Message msg, long when) {
        if (msg.isInUse()) {
            throw new AndroidRuntimeException(msg
                    + " This message is already in use.");
            throw new AndroidRuntimeException(msg + " This message is already in use.");
        }
        if (msg.target == null && !mQuitAllowed) {
            throw new RuntimeException("Main thread not allowed to quit");
        if (msg.target == null) {
            throw new AndroidRuntimeException("Message must have a target.");
        }
        final boolean needWake;

        boolean needWake;
        synchronized (this) {
            if (mQuiting) {
                RuntimeException e = new RuntimeException(
                        msg.target + " sending message to a Handler on a dead thread");
                Log.w("MessageQueue", e.getMessage(), e);
                return false;
            } else if (msg.target == null) {
                mQuiting = true;
            }

            msg.when = when;
            //Log.d("MessageQueue", "Enqueing: " + msg);
            Message p = mMessages;
            if (p == null || when == 0 || when < p.when) {
                // New head, wake up the event queue if blocked.
@@ -281,18 +306,22 @@ public class MessageQueue {
                needWake = mBlocked;
            } else {
                // Inserted within the middle of the queue.  Usually we don't have to wake
                // up the event queue unless the message is asynchronous and it might be
                // possible for it to be returned out of sequence relative to an earlier
                // synchronous message at the head of the queue.
                Message prev = null;
                while (p != null && p.when <= when) {
                // up the event queue unless there is a barrier at the head of the queue
                // and the message is the earliest asynchronous message in the queue.
                needWake = mBlocked && p.target == null && msg.isAsynchronous();
                Message prev;
                for (;;) {
                    prev = p;
                    p = p.next;
                    if (p == null || when < p.when) {
                        break;
                    }
                    if (needWake && p.isAsynchronous()) {
                        needWake = false;
                    }
                }
                msg.next = prev.next;
                msg.next = p; // invariant: p == prev.next
                prev.next = msg;
                needWake = mBlocked && mBarrierNestCount != 0 && msg.isAsynchronous()
                        && !mMessages.isAsynchronous();
            }
        }
        if (needWake) {
@@ -301,17 +330,34 @@ public class MessageQueue {
        return true;
    }

    final boolean removeMessages(Handler h, int what, Object object,
            boolean doRemove) {
    final boolean hasMessages(Handler h, int what, Object object) {
        if (h == null) {
            return false;
        }

        synchronized (this) {
            Message p = mMessages;
            while (p != null) {
                if (p.target == h && p.what == what && (object == null || p.obj == object)) {
                    return true;
                }
                p = p.next;
            }
            return false;
        }
    }

    final void removeMessages(Handler h, int what, Object object) {
        if (h == null) {
            return;
        }

        synchronized (this) {
            Message p = mMessages;
            boolean found = false;

            // Remove all messages at front.
            while (p != null && p.target == h && p.what == what
                   && (object == null || p.obj == object)) {
                if (!doRemove) return true;
                found = true;
                Message n = p.next;
                mMessages = n;
                p.recycle();
@@ -324,8 +370,6 @@ public class MessageQueue {
                if (n != null) {
                    if (n.target == h && n.what == what
                        && (object == null || n.obj == object)) {
                        if (!doRemove) return true;
                        found = true;
                        Message nn = n.next;
                        n.recycle();
                        p.next = nn;
@@ -334,13 +378,11 @@ public class MessageQueue {
                }
                p = n;
            }
            
            return found;
        }
    }

    final void removeMessages(Handler h, Runnable r, Object object) {
        if (r == null) {
        if (h == null || r == null) {
            return;
        }

@@ -374,6 +416,10 @@ public class MessageQueue {
    }

    final void removeCallbacksAndMessages(Handler h, Object object) {
        if (h == null) {
            return;
        }

        synchronized (this) {
            Message p = mMessages;

@@ -401,16 +447,4 @@ public class MessageQueue {
            }
        }
    }

    /*
    private void dumpQueue_l()
    {
        Message p = mMessages;
        System.out.println(this + "  queue is:");
        while (p != null) {
            System.out.println("            " + p);
            p = p.next;
        }
    }
    */
}