Donate to e Foundation | Murena handsets with /e/OS | Own a part of Murena! Learn more

Commit aa917f85 authored by Jeff Brown's avatar Jeff Brown Committed by Android (Google) Code Review
Browse files

Merge "Improve MessageQueue sync barrier implementation."

parents 50eb3b9b 0f85ce38
Loading
Loading
Loading
Loading
+4 −4
Original line number Diff line number Diff line
@@ -513,7 +513,7 @@ public class Handler {
     * message queue.
     */
    public final void removeMessages(int what) {
        mQueue.removeMessages(this, what, null, true);
        mQueue.removeMessages(this, what, null);
    }

    /**
@@ -522,7 +522,7 @@ public class Handler {
     * all messages will be removed.
     */
    public final void removeMessages(int what, Object object) {
        mQueue.removeMessages(this, what, object, true);
        mQueue.removeMessages(this, what, object);
    }

    /**
@@ -539,7 +539,7 @@ public class Handler {
     * the message queue.
     */
    public final boolean hasMessages(int what) {
        return mQueue.removeMessages(this, what, null, false);
        return mQueue.hasMessages(this, what, null);
    }

    /**
@@ -547,7 +547,7 @@ public class Handler {
     * whose obj is 'object' in the message queue.
     */
    public final boolean hasMessages(int what, Object object) {
        return mQueue.removeMessages(this, what, object, false);
        return mQueue.hasMessages(this, what, object);
    }

    // if we can get rid of this method, the handler need not remember its loop
+107 −60
Original line number Diff line number Diff line
@@ -55,13 +55,13 @@ public class Looper {

    // sThreadLocal.get() will return null unless you've called prepare().
    static final ThreadLocal<Looper> sThreadLocal = new ThreadLocal<Looper>();
    private static Looper sMainLooper;  // guarded by Looper.class

    final MessageQueue mQueue;
    final Thread mThread;
    volatile boolean mRun;

    private Printer mLogging = null;
    private static Looper mMainLooper = null;  // guarded by Looper.class
    private Printer mLogging;

     /** Initialize the current thread as a looper.
      * This gives you a chance to create handlers that then reference
@@ -70,10 +70,14 @@ public class Looper {
      * {@link #quit()}.
      */
    public static void prepare() {
        prepare(true);
    }

    private static void prepare(boolean quitAllowed) {
        if (sThreadLocal.get() != null) {
            throw new RuntimeException("Only one Looper may be created per thread");
        }
        sThreadLocal.set(new Looper());
        sThreadLocal.set(new Looper(quitAllowed));
    }

    /**
@@ -83,19 +87,21 @@ public class Looper {
     * to call this function yourself.  See also: {@link #prepare()}
     */
    public static void prepareMainLooper() {
        prepare();
        setMainLooper(myLooper());
        myLooper().mQueue.mQuitAllowed = false;
        prepare(false);
        synchronized (Looper.class) {
            if (sMainLooper != null) {
                throw new IllegalStateException("The main Looper has already been prepared.");
            }
            sMainLooper = myLooper();
        }

    private synchronized static void setMainLooper(Looper looper) {
        mMainLooper = looper;
    }

    /** Returns the application's main looper, which lives in the main thread of the application.
     */
    public synchronized static Looper getMainLooper() {
        return mMainLooper;
    public static Looper getMainLooper() {
        synchronized (Looper.class) {
            return sMainLooper;
        }
    }

    /**
@@ -103,22 +109,21 @@ public class Looper {
     * {@link #quit()} to end the loop.
     */
    public static void loop() {
        Looper me = myLooper();
        final Looper me = myLooper();
        if (me == null) {
            throw new RuntimeException("No Looper; Looper.prepare() wasn't called on this thread.");
        }
        MessageQueue queue = me.mQueue;
        final MessageQueue queue = me.mQueue;

        // Make sure the identity of this thread is that of the local process,
        // and keep track of what that identity token actually is.
        Binder.clearCallingIdentity();
        final long ident = Binder.clearCallingIdentity();

        while (true) {
        for (;;) {
            Message msg = queue.next(); // might block
            if (msg != null) {
                if (msg.target == null) {
                    // No target is a magic identifier for the quit message.
            if (msg == null) {
                // No message indicates that the message queue is quitting.
                return;
            }

@@ -161,7 +166,6 @@ public class Looper {
            msg.recycle();
        }
    }
    }

    /**
     * Return the Looper object associated with the current thread.  Returns
@@ -193,18 +197,61 @@ public class Looper {
        return myLooper().mQueue;
    }

    private Looper() {
        mQueue = new MessageQueue();
    private Looper(boolean quitAllowed) {
        mQueue = new MessageQueue(quitAllowed);
        mRun = true;
        mThread = Thread.currentThread();
    }

    /**
     * Quits the looper.
     *
     * Causes the {@link #loop} method to terminate as soon as possible.
     */
    public void quit() {
        Message msg = Message.obtain();
        // NOTE: By enqueueing directly into the message queue, the
        // message is left with a null target.  This is how we know it is
        // a quit message.
        mQueue.enqueueMessage(msg, 0);
        mQueue.quit();
    }

    /**
     * Posts a synchronization barrier to the Looper's message queue.
     *
     * Message processing occurs as usual until the message queue encounters the
     * synchronization barrier that has been posted.  When the barrier is encountered,
     * later synchronous messages in the queue are stalled (prevented from being executed)
     * until the barrier is released by calling {@link #removeSyncBarrier} and specifying
     * the token that identifies the synchronization barrier.
     *
     * This method is used to immediately postpone execution of all subsequently posted
     * synchronous messages until a condition is met that releases the barrier.
     * Asynchronous messages (see {@link Message#isAsynchronous} are exempt from the barrier
     * and continue to be processed as usual.
     *
     * This call must be always matched by a call to {@link #removeSyncBarrier} with
     * the same token to ensure that the message queue resumes normal operation.
     * Otherwise the application will probably hang!
     *
     * @return A token that uniquely identifies the barrier.  This token must be
     * passed to {@link #removeSyncBarrier} to release the barrier.
     *
     * @hide
     */
    public final int postSyncBarrier() {
        return mQueue.enqueueSyncBarrier(SystemClock.uptimeMillis());
    }


    /**
     * Removes a synchronization barrier.
     *
     * @param token The synchronization barrier token that was returned by
     * {@link #postSyncBarrier}.
     *
     * @throws IllegalStateException if the barrier was not found.
     *
     * @hide
     */
    public final void removeSyncBarrier(int token) {
        mQueue.removeSyncBarrier(token);
    }

    /**
+149 −115
Original line number Diff line number Diff line
@@ -30,20 +30,23 @@ import java.util.ArrayList;
 * {@link Looper#myQueue() Looper.myQueue()}.
 */
public class MessageQueue {
    // True if the message queue can be quit.
    private final boolean mQuitAllowed;

    @SuppressWarnings("unused")
    private int mPtr; // used by native code

    Message mMessages;
    private final ArrayList<IdleHandler> mIdleHandlers = new ArrayList<IdleHandler>();
    private IdleHandler[] mPendingIdleHandlers;
    private boolean mQuiting;
    boolean mQuitAllowed = true;

    // Indicates whether next() is blocked waiting in pollOnce() with a non-zero timeout.
    private boolean mBlocked;

    // Indicates the barrier nesting level.
    private int mBarrierNestCount;

    @SuppressWarnings("unused")
    private int mPtr; // used by native code
    // The next barrier token.
    // Barriers are indicated by messages with a null target whose arg1 field carries the token.
    private int mNextBarrierToken;

    private native void nativeInit();
    private native void nativeDestroy();
@@ -97,53 +100,8 @@ public class MessageQueue {
        }
    }

    /**
     * Acquires a synchronization barrier.
     *
     * While a synchronization barrier is active, only asynchronous messages are
     * permitted to execute.  Synchronous messages are retained but are not executed
     * until the synchronization barrier is released.
     *
     * This method is used to immediately postpone execution of all synchronous messages
     * until a condition is met that releases the barrier.  Asynchronous messages are
     * exempt from the barrier and continue to be executed as usual.
     *
     * This call nests and must be matched by an equal number of calls to
     * {@link #releaseSyncBarrier}.
     *
     * @hide
     */
    public final void acquireSyncBarrier() {
        synchronized (this) {
            mBarrierNestCount += 1;
        }
    }

    /**
     * Releases a synchronization barrier.
     *
     * This class undoes one invocation of {@link #acquireSyncBarrier}.
     *
     * @throws IllegalStateException if the barrier is not acquired.
     *
     * @hide
     */
    public final void releaseSyncBarrier() {
        synchronized (this) {
            if (mBarrierNestCount == 0) {
                throw new IllegalStateException("The message queue synchronization barrier "
                        + "has not been acquired.");
            }

            mBarrierNestCount -= 1;
            if (!mBlocked || mMessages == null) {
                return;
            }
        }
        nativeWake(mPtr);
    }

    MessageQueue() {
    MessageQueue(boolean quitAllowed) {
        mQuitAllowed = quitAllowed;
        nativeInit();
    }

@@ -167,26 +125,26 @@ public class MessageQueue {
            nativePollOnce(mPtr, nextPollTimeoutMillis);

            synchronized (this) {
                if (mQuiting) {
                    return null;
                }

                // Try to retrieve the next message.  Return if found.
                final long now = SystemClock.uptimeMillis();

                Message prevMsg = null;
                Message msg = mMessages;
                for (;;) {
                    if (msg == null) {
                        // No more messages.
                        nextPollTimeoutMillis = -1;
                        break;
                if (msg != null && msg.target == null) {
                    // Stalled by a barrier.  Find the next asynchronous message in the queue.
                    do {
                        prevMsg = msg;
                        msg = msg.next;
                    } while (msg != null && !msg.isAsynchronous());
                }

                    final long when = msg.when;
                    if (now < when) {
                if (msg != null) {
                    if (now < msg.when) {
                        // Next message is not ready.  Set a timeout to wake up when it is ready.
                        nextPollTimeoutMillis = (int) Math.min(when - now, Integer.MAX_VALUE);
                        break;
                    }

                    if (mBarrierNestCount == 0 || msg.isAsynchronous()) {
                        nextPollTimeoutMillis = (int) Math.min(msg.when - now, Integer.MAX_VALUE);
                    } else {
                        // Got a message.
                        mBlocked = false;
                        if (prevMsg != null) {
@@ -199,16 +157,16 @@ public class MessageQueue {
                        msg.markInUse();
                        return msg;
                    }

                    // We have a message that we could return except that it is
                    // blocked by the sync barrier.  In particular, this means that
                    // we are not idle yet, so we do not want to run the idle handlers.
                    prevMsg = msg;
                    msg = msg.next;
                } else {
                    // No more messages.
                    nextPollTimeoutMillis = -1;
                }

                // If first time idle, then get the number of idlers to run.
                if (pendingIdleHandlerCount < 0 && msg == mMessages) {
                // Idle handles only run if the queue is empty or if the first message
                // in the queue (possibly a barrier) is due to be handled in the future.
                if (pendingIdleHandlerCount < 0
                        && (mMessages == null || now < mMessages.when)) {
                    pendingIdleHandlerCount = mIdleHandlers.size();
                }
                if (pendingIdleHandlerCount <= 0) {
@@ -252,27 +210,94 @@ public class MessageQueue {
        }
    }

    final void quit() {
        if (!mQuitAllowed) {
            throw new RuntimeException("Main thread not allowed to quit.");
        }

        synchronized (this) {
            if (mQuiting) {
                return;
            }
            mQuiting = true;
        }
        nativeWake(mPtr);
    }

    final int enqueueSyncBarrier(long when) {
        // Enqueue a new sync barrier token.
        // We don't need to wake the queue because the purpose of a barrier is to stall it.
        synchronized (this) {
            final int token = mNextBarrierToken++;
            final Message msg = Message.obtain();
            msg.arg1 = token;

            Message prev = null;
            Message p = mMessages;
            if (when != 0) {
                while (p != null && p.when <= when) {
                    prev = p;
                    p = p.next;
                }
            }
            if (prev != null) { // invariant: p == prev.next
                msg.next = p;
                prev.next = msg;
            } else {
                msg.next = p;
                mMessages = msg;
            }
            return token;
        }
    }

    final void removeSyncBarrier(int token) {
        // Remove a sync barrier token from the queue.
        // If the queue is no longer stalled by a barrier then wake it.
        final boolean needWake;
        synchronized (this) {
            Message prev = null;
            Message p = mMessages;
            while (p != null && (p.target != null || p.arg1 != token)) {
                prev = p;
                p = p.next;
            }
            if (p == null) {
                throw new IllegalStateException("The specified message queue synchronization "
                        + " barrier token has not been posted or has already been removed.");
            }
            if (prev != null) {
                prev.next = p.next;
                needWake = false;
            } else {
                mMessages = p.next;
                needWake = mMessages == null || mMessages.target != null;
            }
            p.recycle();
        }
        if (needWake) {
            nativeWake(mPtr);
        }
    }

    final boolean enqueueMessage(Message msg, long when) {
        if (msg.isInUse()) {
            throw new AndroidRuntimeException(msg
                    + " This message is already in use.");
            throw new AndroidRuntimeException(msg + " This message is already in use.");
        }
        if (msg.target == null && !mQuitAllowed) {
            throw new RuntimeException("Main thread not allowed to quit");
        if (msg.target == null) {
            throw new AndroidRuntimeException("Message must have a target.");
        }
        final boolean needWake;

        boolean needWake;
        synchronized (this) {
            if (mQuiting) {
                RuntimeException e = new RuntimeException(
                        msg.target + " sending message to a Handler on a dead thread");
                Log.w("MessageQueue", e.getMessage(), e);
                return false;
            } else if (msg.target == null) {
                mQuiting = true;
            }

            msg.when = when;
            //Log.d("MessageQueue", "Enqueing: " + msg);
            Message p = mMessages;
            if (p == null || when == 0 || when < p.when) {
                // New head, wake up the event queue if blocked.
@@ -281,18 +306,22 @@ public class MessageQueue {
                needWake = mBlocked;
            } else {
                // Inserted within the middle of the queue.  Usually we don't have to wake
                // up the event queue unless the message is asynchronous and it might be
                // possible for it to be returned out of sequence relative to an earlier
                // synchronous message at the head of the queue.
                Message prev = null;
                while (p != null && p.when <= when) {
                // up the event queue unless there is a barrier at the head of the queue
                // and the message is the earliest asynchronous message in the queue.
                needWake = mBlocked && p.target == null && msg.isAsynchronous();
                Message prev;
                for (;;) {
                    prev = p;
                    p = p.next;
                    if (p == null || when < p.when) {
                        break;
                    }
                    if (needWake && p.isAsynchronous()) {
                        needWake = false;
                    }
                }
                msg.next = prev.next;
                msg.next = p; // invariant: p == prev.next
                prev.next = msg;
                needWake = mBlocked && mBarrierNestCount != 0 && msg.isAsynchronous()
                        && !mMessages.isAsynchronous();
            }
        }
        if (needWake) {
@@ -301,17 +330,34 @@ public class MessageQueue {
        return true;
    }

    final boolean removeMessages(Handler h, int what, Object object,
            boolean doRemove) {
    final boolean hasMessages(Handler h, int what, Object object) {
        if (h == null) {
            return false;
        }

        synchronized (this) {
            Message p = mMessages;
            while (p != null) {
                if (p.target == h && p.what == what && (object == null || p.obj == object)) {
                    return true;
                }
                p = p.next;
            }
            return false;
        }
    }

    final void removeMessages(Handler h, int what, Object object) {
        if (h == null) {
            return;
        }

        synchronized (this) {
            Message p = mMessages;
            boolean found = false;

            // Remove all messages at front.
            while (p != null && p.target == h && p.what == what
                   && (object == null || p.obj == object)) {
                if (!doRemove) return true;
                found = true;
                Message n = p.next;
                mMessages = n;
                p.recycle();
@@ -324,8 +370,6 @@ public class MessageQueue {
                if (n != null) {
                    if (n.target == h && n.what == what
                        && (object == null || n.obj == object)) {
                        if (!doRemove) return true;
                        found = true;
                        Message nn = n.next;
                        n.recycle();
                        p.next = nn;
@@ -334,13 +378,11 @@ public class MessageQueue {
                }
                p = n;
            }
            
            return found;
        }
    }

    final void removeMessages(Handler h, Runnable r, Object object) {
        if (r == null) {
        if (h == null || r == null) {
            return;
        }

@@ -374,6 +416,10 @@ public class MessageQueue {
    }

    final void removeCallbacksAndMessages(Handler h, Object object) {
        if (h == null) {
            return;
        }

        synchronized (this) {
            Message p = mMessages;

@@ -401,16 +447,4 @@ public class MessageQueue {
            }
        }
    }

    /*
    private void dumpQueue_l()
    {
        Message p = mMessages;
        System.out.println(this + "  queue is:");
        while (p != null) {
            System.out.println("            " + p);
            p = p.next;
        }
    }
    */
}