Donate to e Foundation | Murena handsets with /e/OS | Own a part of Murena! Learn more

Commit e37672ba authored by Mark Fasheh's avatar Mark Fasheh
Browse files

MessageQueue: dispose() from next()

Concurrent implementation was waiting until finalize() to dispose of
mPtr and drop it's reference to the native messagequeue. But finalize()
may not be run until much later. IpClientSignatureTest#testNoFdLeaks
was failing due to open fds which were a result of Looper being kept
around and not freed. So dispose() of our pointer and reference
from next() when we are quitting our MessageQueue. This closes any
epoll control fds immediately.

Bug: 407098224
Test: atest NetworkStackIntegrationTests:android.net.ip.IpClientSignatureTest
Test: atest MessageQueueTest
Flag: EXEMPT - bugfix
Change-Id: I7c885fa2cc5cbe16116d162e050e912783c66d01
parent c9ed17d4
Loading
Loading
Loading
Loading
+89 −21
Original line number Diff line number Diff line
@@ -50,6 +50,7 @@ import java.util.concurrent.ConcurrentSkipListSet;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.LockSupport;
import java.util.concurrent.locks.ReentrantLock;

/**
@@ -385,8 +386,14 @@ public final class MessageQueue {

    private boolean isPollingConcurrent() {
        // If the loop is quitting then it must not be idling.
        // We can assume mPtr != 0 when sQuitting is false.
        return !((boolean) sQuitting.getVolatile(this)) && nativeIsPolling(mPtr);
        if (!incrementQuittingState()) {
            return false;
        }
        try {
            return nativeIsPolling(mPtr);
        } finally {
            decrementQuittingState();
        }
    }

    private boolean isPollingLegacy() {
@@ -838,7 +845,9 @@ public final class MessageQueue {
                return msg;
            }

            if ((boolean) sQuitting.getVolatile(this)) {
            // Prevent any race between quit()/nativeWake() and dispose()
            if (checkQuittingAndWaitForRefsToDrop()) {
                dispose();
                return null;
            }

@@ -1028,17 +1037,22 @@ public final class MessageQueue {
        }

        if (mUseConcurrent) {
            synchronized (mIdleHandlersLock) {
                if (sQuitting.compareAndSet(this, false, true)) {
            if (!incrementQuittingState()) {
                return;
            }
            try {
                if (setQuitting()) {
                    if (safe) {
                        removeAllFutureMessages();
                    } else {
                        removeAllMessages();
                    }

                    // We can assume mPtr != 0 because sQuitting was previously false.
                    // We can assume mPtr != 0 while we hold a ref on our quitting state
                    nativeWake(mPtr);
                }
            } finally {
                decrementQuittingState();
            }
        } else {
            synchronized (this) {
@@ -2208,7 +2222,7 @@ public final class MessageQueue {
            n += dumpPriorityQueue(mAsyncPriorityQueue, pw, prefix, h, n);

            pw.println(prefix + "(Total messages: " + n + ", polling=" + isPolling()
                    + ", quitting=" + (boolean) sQuitting.getVolatile(this) + ")");
                    + ", quitting=" + getQuitting() + ")");
            return;
        }

@@ -2256,7 +2270,7 @@ public final class MessageQueue {
            dumpPriorityQueue(mAsyncPriorityQueue, proto);

            proto.write(MessageQueueProto.IS_POLLING_LOCKED, isPolling());
            proto.write(MessageQueueProto.IS_QUITTING, (boolean) sQuitting.getVolatile(this));
            proto.write(MessageQueueProto.IS_QUITTING, getQuitting());
            proto.end(messageQueueToken);
            return;
        }
@@ -2632,6 +2646,72 @@ public final class MessageQueue {

    }

    /*
     * Combine our quitting state with a ref count of access to mPtr.
     * next() doesn't want to dispose of mPtr until quit() is done processing messages.
     * isPolling() also needs to ensure safe access to mPtr.
     * So keep a ref count of access to mPtr. If quitting is set, we disallow new refs.
     * next() will only proceed with disposing of the pointer once all refs are dropped.
     */
    private static VarHandle sQuittingRefCount;
    private volatile long mQuittingRefCountValue = 0;
    static {
        try {
            MethodHandles.Lookup l = MethodHandles.lookup();
            sQuittingRefCount = l.findVarHandle(MessageQueue.class, "mQuittingRefCountValue",
                    long.class);
        } catch (Exception e) {
            Log.wtf(TAG_C, "VarHandle lookup failed with exception: " + e);
            throw new ExceptionInInitializerError(e);
        }
    }

    // Use MSB to indicate quitting state. Lower 63 bits hold ref count.
    private static final long QUITTING_MASK = ~(-1L >>> 1);

    private boolean incrementQuittingState() {
        long oldVal = (long)sQuittingRefCount.getAndAdd(this, 1);
        if ((oldVal & QUITTING_MASK) != 0) {
            // If we're quitting we need to drop our ref and indicate to the caller
            sQuittingRefCount.getAndAdd(this, -1);
            return false;
        }
        return true;
    }

    private void decrementQuittingState() {
        long oldVal = (long)sQuittingRefCount.getAndAdd(this, -1);
        // If quitting and we were the last ref, wake up looper thread
        if ((oldVal & QUITTING_MASK) != 0 && (oldVal & ~QUITTING_MASK) == 1L) {
            LockSupport.unpark(mLooperThread);
        }
    }

    private boolean setQuitting() {
        long oldVal = (long)sQuittingRefCount.getAndBitwiseOr(this, QUITTING_MASK);
        if ((oldVal & QUITTING_MASK) != 0) {
            return false;
        }
        return true;
    }

    private boolean getQuitting() {
        return (mQuittingRefCountValue & QUITTING_MASK) != 0;
    }

    private volatile Thread mLooperThread = null;
    // Must only be called from looper thread
    private boolean checkQuittingAndWaitForRefsToDrop() {
        if (!getQuitting()) {
            return false;
        }
        mLooperThread = Thread.currentThread();
        while ((mQuittingRefCountValue & ~QUITTING_MASK) != 0) {
            LockSupport.park();
        }
        return true;
    }

    /*
     * Tracks the number of queued and cancelled messages in our stack.
     *
@@ -2721,18 +2801,6 @@ public final class MessageQueue {
    private final Object mIdleHandlersLock = new Object();
    private final Object mFileDescriptorRecordsLock = new Object();

    private static final VarHandle sQuitting;
    private boolean mQuittingValue = false;
    static {
        try {
            MethodHandles.Lookup l = MethodHandles.lookup();
            sQuitting = l.findVarHandle(MessageQueue.class, "mQuittingValue", boolean.class);
        } catch (Exception e) {
            Log.wtf(TAG_C, "VarHandle lookup failed with exception: " + e);
            throw new ExceptionInInitializerError(e);
        }
    }

    // The next barrier token.
    // Barriers are indicated by messages with a null target whose arg1 field carries the token.
    private final AtomicInteger mNextBarrierTokenAtomic = new AtomicInteger(1);
@@ -2747,7 +2815,7 @@ public final class MessageQueue {
    private final Condition mDrainCompleted = mDrainingLock.newCondition();

    private boolean enqueueMessageUnchecked(@NonNull Message msg, long when) {
        if ((boolean) sQuitting.getVolatile(this)) {
        if (getQuitting()) {
            IllegalStateException e = new IllegalStateException(
                    msg.target + " sending message to a Handler on a dead thread");
            Log.w(TAG_C, e.getMessage(), e);
+89 −21
Original line number Diff line number Diff line
@@ -46,6 +46,7 @@ import java.util.concurrent.ConcurrentSkipListSet;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.LockSupport;
import java.util.concurrent.locks.ReentrantLock;

/**
@@ -235,7 +236,72 @@ public final class MessageQueue {
            Log.wtf(TAG, "VarHandle lookup failed with exception: " + e);
            throw new ExceptionInInitializerError(e);
        }
    }

    /*
     * Combine our quitting state with a ref count of access to mPtr.
     * next() doesn't want to dispose of mPtr until quit() is done processing messages.
     * isPolling() also needs to ensure safe access to mPtr.
     * So keep a ref count of access to mPtr. If quitting is set, we disallow new refs.
     * next() will only proceed with disposing of the pointer once all refs are dropped.
     */
    private static VarHandle sQuittingRefCount;
    private volatile long mQuittingRefCountValue = 0;
    static {
        try {
            MethodHandles.Lookup l = MethodHandles.lookup();
            sQuittingRefCount = l.findVarHandle(MessageQueue.class, "mQuittingRefCountValue",
                    long.class);
        } catch (Exception e) {
            Log.wtf(TAG_C, "VarHandle lookup failed with exception: " + e);
            throw new ExceptionInInitializerError(e);
        }
    }

    // Use MSB to indicate quitting state. Lower 63 bits hold ref count.
    private static final long QUITTING_MASK = ~(-1L >>> 1);

    private boolean incrementQuittingState() {
        long oldVal = (long)sQuittingRefCount.getAndAdd(this, 1);
        if ((oldVal & QUITTING_MASK) != 0) {
            // If we're quitting we need to drop our ref and indicate to the caller
            sQuittingRefCount.getAndAdd(this, -1);
            return false;
        }
        return true;
    }

    private void decrementQuittingState() {
        long oldVal = (long)sQuittingRefCount.getAndAdd(this, -1);
        // If quitting and we were the last ref, wake up looper thread
        if ((oldVal & QUITTING_MASK) != 0 && (oldVal & ~QUITTING_MASK) == 1L) {
            LockSupport.unpark(mLooperThread);
        }
    }

    private boolean setQuitting() {
        long oldVal = (long)sQuittingRefCount.getAndBitwiseOr(this, QUITTING_MASK);
        if ((oldVal & QUITTING_MASK) != 0) {
            return false;
        }
        return true;
    }

    private boolean getQuitting() {
        return (mQuittingRefCountValue & QUITTING_MASK) != 0;
    }

    private volatile Thread mLooperThread = null;
    // Must only be called from looper thread
    private boolean checkQuittingAndWaitForRefsToDrop() {
        if (!getQuitting()) {
            return false;
        }
        mLooperThread = Thread.currentThread();
        while ((mQuittingRefCountValue & ~QUITTING_MASK) != 0) {
            LockSupport.park();
        }
        return true;
    }

    /*
@@ -333,18 +399,6 @@ public final class MessageQueue {
    @GuardedBy("mFileDescriptorRecordsLock")
    private SparseArray<FileDescriptorRecord> mFileDescriptorRecords;

    private static final VarHandle sQuitting;
    private boolean mQuittingValue = false;
    static {
        try {
            MethodHandles.Lookup l = MethodHandles.lookup();
            sQuitting = l.findVarHandle(MessageQueue.class, "mQuittingValue", boolean.class);
        } catch (Exception e) {
            Log.wtf(TAG, "VarHandle lookup failed with exception: " + e);
            throw new ExceptionInInitializerError(e);
        }
    }

    // The next barrier token.
    // Barriers are indicated by messages with a null target whose arg1 field carries the token.
    private final AtomicInteger mNextBarrierToken = new AtomicInteger(1);
@@ -509,8 +563,14 @@ public final class MessageQueue {
     */
    public boolean isPolling() {
        // If the loop is quitting then it must not be idling.
        // We can assume mPtr != 0 when sQuitting is false.
        return !((boolean) sQuitting.getVolatile(this)) && nativeIsPolling(mPtr);
        if (!incrementQuittingState()) {
            return false;
        }
        try {
            return nativeIsPolling(mPtr);
        } finally {
            decrementQuittingState();
        }
    }

    /* Helper to choose the correct queue to insert into. */
@@ -790,7 +850,9 @@ public final class MessageQueue {
                return msg;
            }

            if ((boolean) sQuitting.getVolatile(this)) {
            // Prevent any race between quit()/nativeWake() and dispose()
            if (checkQuittingAndWaitForRefsToDrop()) {
                dispose();
                return null;
            }

@@ -846,17 +908,23 @@ public final class MessageQueue {
        if (!mQuitAllowed) {
            throw new IllegalStateException("Main thread not allowed to quit.");
        }
        synchronized (mIdleHandlersLock) {
            if (sQuitting.compareAndSet(this, false, true)) {

        if (!incrementQuittingState()) {
            return;
        }
        try {
            if (setQuitting()) {
                if (safe) {
                    removeAllFutureMessages();
                } else {
                    removeAllMessages();
                }

                // We can assume mPtr != 0 because sQuitting was previously false.
                // We can assume mPtr != 0 while we hold a ref on our quitting state
                nativeWake(mPtr);
            }
        } finally {
            decrementQuittingState();
        }
    }

@@ -873,7 +941,7 @@ public final class MessageQueue {
    }

    private boolean enqueueMessageUnchecked(@NonNull Message msg, long when) {
        if ((boolean) sQuitting.getVolatile(this)) {
        if (getQuitting()) {
            IllegalStateException e = new IllegalStateException(
                    msg.target + " sending message to a Handler on a dead thread");
            Log.w(TAG, e.getMessage(), e);
@@ -1499,7 +1567,7 @@ public final class MessageQueue {
        n += dumpPriorityQueue(mAsyncPriorityQueue, pw, prefix, h, n);

        pw.println(prefix + "(Total messages: " + n + ", polling=" + isPolling()
                + ", quitting=" + (boolean) sQuitting.getVolatile(this) + ")");
                + ", quitting=" + getQuitting() + ")");
    }

    @NeverCompile
@@ -1530,7 +1598,7 @@ public final class MessageQueue {
        dumpPriorityQueue(mAsyncPriorityQueue, proto);

        proto.write(MessageQueueProto.IS_POLLING_LOCKED, isPolling());
        proto.write(MessageQueueProto.IS_QUITTING, (boolean) sQuitting.getVolatile(this));
        proto.write(MessageQueueProto.IS_QUITTING, getQuitting());
        proto.end(messageQueueToken);
    }