Donate to e Foundation | Murena handsets with /e/OS | Own a part of Murena! Learn more

Commit f840e07f authored by Erik Kline's avatar Erik Kline
Browse files

Change BlockingSocketReader to use MessageQueue fd handling

Convert the simplistic blocking read in a separate thread model to
the MessageQueue OnFileDescriptorEventListener model, albeit still
on a separate thread.

Test: as follows
    - built
    - flashed
    - booted
    - "runtest frameworks-net" passes
    - basic IpManager functions still work,
      including ConnectivityPacketTracker
Bug: 62476366
Bug: 67013397
Change-Id: I1f4a6707eba402338947fe3f5392a26660f05714
parent f4ec2ab7
Loading
Loading
Loading
Loading
+24 −14
Original line number Diff line number Diff line
@@ -61,11 +61,11 @@ public class ConnectivityPacketTracker {
    private static final String MARK_STOP = "--- STOP ---";

    private final String mTag;
    private final Handler mHandler;
    private final LocalLog mLog;
    private final BlockingSocketReader mPacketListener;
    private boolean mRunning;

    public ConnectivityPacketTracker(NetworkInterface netif, LocalLog log) {
    public ConnectivityPacketTracker(Handler h, NetworkInterface netif, LocalLog log) {
        final String ifname;
        final int ifindex;
        final byte[] hwaddr;
@@ -81,44 +81,40 @@ public class ConnectivityPacketTracker {
        }

        mTag = TAG + "." + ifname;
        mHandler = new Handler();
        mLog = log;
        mPacketListener = new PacketListener(ifindex, hwaddr, mtu);
        mPacketListener = new PacketListener(h, ifindex, hwaddr, mtu);
    }

    public void start() {
        mLog.log(MARK_START);
        mRunning = true;
        mPacketListener.start();
    }

    public void stop() {
        mPacketListener.stop();
        mLog.log(MARK_STOP);
        mRunning = false;
    }

    private final class PacketListener extends BlockingSocketReader {
        private final int mIfIndex;
        private final byte mHwAddr[];

        PacketListener(int ifindex, byte[] hwaddr, int mtu) {
            super(mtu);
        PacketListener(Handler h, int ifindex, byte[] hwaddr, int mtu) {
            super(h, mtu);
            mIfIndex = ifindex;
            mHwAddr = hwaddr;
        }

        @Override
        protected FileDescriptor createSocket() {
        protected FileDescriptor createFd() {
            FileDescriptor s = null;
            try {
                // TODO: Evaluate switching to SOCK_DGRAM and changing the
                // BlockingSocketReader's read() to recvfrom(), so that this
                // might work on non-ethernet-like links (via SLL).
                s = Os.socket(AF_PACKET, SOCK_RAW, 0);
                NetworkUtils.attachControlPacketFilter(s, ARPHRD_ETHER);
                Os.bind(s, new PacketSocketAddress((short) ETH_P_ALL, mIfIndex));
            } catch (ErrnoException | IOException e) {
                logError("Failed to create packet tracking socket: ", e);
                closeSocket(s);
                closeFd(s);
                return null;
            }
            return s;
@@ -135,6 +131,20 @@ public class ConnectivityPacketTracker {
                        "\n[" + new String(HexEncoding.encode(recvbuf, 0, length)) + "]");
        }

        @Override
        protected void onStart() {
            mLog.log(MARK_START);
        }

        @Override
        protected void onStop() {
            if (mRunning) {
                mLog.log(MARK_STOP);
            } else {
                mLog.log(MARK_STOP + " (packet listener stopped unexpectedly)");
            }
        }

        @Override
        protected void logError(String msg, Exception e) {
            Log.e(mTag, msg, e);
@@ -142,7 +152,7 @@ public class ConnectivityPacketTracker {
        }

        private void addLogEntry(String entry) {
            mHandler.post(() -> mLog.log(entry));
            mLog.log(entry);
        }
    }
}
+2 −1
Original line number Diff line number Diff line
@@ -1515,7 +1515,8 @@ public class IpManager extends StateMachine {

        private ConnectivityPacketTracker createPacketTracker() {
            try {
                return new ConnectivityPacketTracker(mNetworkInterface, mConnectivityPacketLog);
                return new ConnectivityPacketTracker(
                        getHandler(), mNetworkInterface, mConnectivityPacketLog);
            } catch (IllegalArgumentException e) {
                return null;
            }
+155 −61
Original line number Diff line number Diff line
@@ -16,81 +16,106 @@

package android.net.util;

import static android.os.MessageQueue.OnFileDescriptorEventListener.EVENT_INPUT;
import static android.os.MessageQueue.OnFileDescriptorEventListener.EVENT_ERROR;

import android.annotation.Nullable;
import android.os.Handler;
import android.os.Looper;
import android.os.MessageQueue;
import android.os.MessageQueue.OnFileDescriptorEventListener;
import android.system.ErrnoException;
import android.system.Os;
import android.system.OsConstants;

import libcore.io.IoBridge;
import libcore.io.IoUtils;

import java.io.FileDescriptor;
import java.io.InterruptedIOException;
import java.io.IOException;


/**
 * A thread that reads from a socket and passes the received packets to a
 * subclass's handlePacket() method.  The packet receive buffer is recycled
 * on every read call, so subclasses should make any copies they would like
 * inside their handlePacket() implementation.
 * This class encapsulates the mechanics of registering a file descriptor
 * with a thread's Looper and handling read events (and errors).
 *
 * Subclasses MUST implement createFd() and SHOULD override handlePacket().

 * Subclasses can expect a call life-cycle like the following:
 *
 *     [1] start() calls createFd() and (if all goes well) onStart()
 *
 *     [2] yield, waiting for read event or error notification:
 *
 *             [a] readPacket() && handlePacket()
 *
 *             [b] if (no error):
 *                     goto 2
 *                 else:
 *                     goto 3
 *
 * All public methods may be called from any thread.
 *     [3] stop() calls onStop() if not previously stopped
 *
 * The packet receive buffer is recycled on every read call, so subclasses
 * should make any copies they would like inside their handlePacket()
 * implementation.
 *
 * All public methods MUST only be called from the same thread with which
 * the Handler constructor argument is associated.
 *
 * TODO: rename this class to something more correctly descriptive (something
 * like [or less horrible than] FdReadEventsHandler?).
 *
 * @hide
 */
public abstract class BlockingSocketReader {
    private static final int FD_EVENTS = EVENT_INPUT | EVENT_ERROR;
    private static final int UNREGISTER_THIS_FD = 0;

    public static final int DEFAULT_RECV_BUF_SIZE = 2 * 1024;

    private final Handler mHandler;
    private final MessageQueue mQueue;
    private final byte[] mPacket;
    private final Thread mThread;
    private volatile FileDescriptor mSocket;
    private volatile boolean mRunning;
    private volatile long mPacketsReceived;

    // Make it slightly easier for subclasses to properly close a socket
    // without having to know this incantation.
    public static final void closeSocket(@Nullable FileDescriptor fd) {
        try {
            IoBridge.closeAndSignalBlockedThreads(fd);
        } catch (IOException ignored) {}
    }
    private FileDescriptor mFd;
    private long mPacketsReceived;

    protected BlockingSocketReader() {
        this(DEFAULT_RECV_BUF_SIZE);
    protected static void closeFd(FileDescriptor fd) {
        IoUtils.closeQuietly(fd);
    }

    protected BlockingSocketReader(int recvbufsize) {
        if (recvbufsize < DEFAULT_RECV_BUF_SIZE) {
            recvbufsize = DEFAULT_RECV_BUF_SIZE;
        }
        mPacket = new byte[recvbufsize];
        mThread = new Thread(() -> { mainLoop(); });
    protected BlockingSocketReader(Handler h) {
        this(h, DEFAULT_RECV_BUF_SIZE);
    }

    public final boolean start() {
        if (mSocket != null) return false;

        try {
            mSocket = createSocket();
        } catch (Exception e) {
            logError("Failed to create socket: ", e);
            return false;
    protected BlockingSocketReader(Handler h, int recvbufsize) {
        mHandler = h;
        mQueue = mHandler.getLooper().getQueue();
        mPacket = new byte[Math.max(recvbufsize, DEFAULT_RECV_BUF_SIZE)];
    }

        if (mSocket == null) return false;

        mRunning = true;
        mThread.start();
        return true;
    public final void start() {
        if (onCorrectThread()) {
            createAndRegisterFd();
        } else {
            mHandler.post(() -> {
                logError("start() called from off-thread", null);
                createAndRegisterFd();
            });
        }
    }

    public final void stop() {
        mRunning = false;
        closeSocket(mSocket);
        mSocket = null;
        if (onCorrectThread()) {
            unregisterAndDestroyFd();
        } else {
            mHandler.post(() -> {
                logError("stop() called from off-thread", null);
                unregisterAndDestroyFd();
            });
        }
    }

    public final boolean isRunning() { return mRunning; }
    public final int recvBufSize() { return mPacket.length; }

    public final long numPacketsReceived() { return mPacketsReceived; }

@@ -98,11 +123,21 @@ public abstract class BlockingSocketReader {
     * Subclasses MUST create the listening socket here, including setting
     * all desired socket options, interface or address/port binding, etc.
     */
    protected abstract FileDescriptor createSocket();
    protected abstract FileDescriptor createFd();

    /**
     * Subclasses MAY override this to change the default read() implementation
     * in favour of, say, recvfrom().
     *
     * Implementations MUST return the bytes read or throw an Exception.
     */
    protected int readPacket(FileDescriptor fd, byte[] packetBuffer) throws Exception {
        return Os.read(fd, packetBuffer, 0, packetBuffer.length);
    }

    /**
     * Called by the main loop for every packet.  Any desired copies of
     * |recvbuf| should be made in here, and the underlying byte array is
     * |recvbuf| should be made in here, as the underlying byte array is
     * reused across all reads.
     */
    protected void handlePacket(byte[] recvbuf, int length) {}
@@ -113,43 +148,102 @@ public abstract class BlockingSocketReader {
    protected void logError(String msg, Exception e) {}

    /**
     * Called by the main loop just prior to exiting.
     * Called by start(), if successful, just prior to returning.
     */
    protected void onExit() {}
    protected void onStart() {}

    private final void mainLoop() {
    /**
     * Called by stop() just prior to returning.
     */
    protected void onStop() {}

    private void createAndRegisterFd() {
        if (mFd != null) return;

        try {
            mFd = createFd();
            if (mFd != null) {
                // Force the socket to be non-blocking.
                IoUtils.setBlocking(mFd, false);
            }
        } catch (Exception e) {
            logError("Failed to create socket: ", e);
            closeFd(mFd);
            mFd = null;
            return;
        }

        if (mFd == null) return;

        mQueue.addOnFileDescriptorEventListener(
                mFd,
                FD_EVENTS,
                new OnFileDescriptorEventListener() {
                    @Override
                    public int onFileDescriptorEvents(FileDescriptor fd, int events) {
                        // Always call handleInput() so read/recvfrom are given
                        // a proper chance to encounter a meaningful errno and
                        // perhaps log a useful error message.
                        if (!isRunning() || !handleInput()) {
                            unregisterAndDestroyFd();
                            return UNREGISTER_THIS_FD;
                        }
                        return FD_EVENTS;
                    }
                });
        onStart();
    }

    private boolean isRunning() { return (mFd != null) && mFd.valid(); }

    // Keep trying to read until we get EAGAIN/EWOULDBLOCK or some fatal error.
    private boolean handleInput() {
        while (isRunning()) {
            final int bytesRead;

            try {
                // Blocking read.
                // TODO: See if this can be converted to recvfrom.
                bytesRead = Os.read(mSocket, mPacket, 0, mPacket.length);
                bytesRead = readPacket(mFd, mPacket);
                if (bytesRead < 1) {
                    if (isRunning()) logError("Socket closed, exiting", null);
                    break;
                }
                mPacketsReceived++;
            } catch (ErrnoException e) {
                if (e.errno != OsConstants.EINTR) {
                    if (isRunning()) logError("read error: ", e);
                if (e.errno == OsConstants.EAGAIN) {
                    // We've read everything there is to read this time around.
                    return true;
                } else if (e.errno == OsConstants.EINTR) {
                    continue;
                } else {
                    if (isRunning()) logError("readPacket error: ", e);
                    break;
                }
                continue;
            } catch (IOException ioe) {
                if (isRunning()) logError("read error: ", ioe);
                continue;
            } catch (Exception e) {
                if (isRunning()) logError("readPacket error: ", e);
                break;
            }

            try {
                handlePacket(mPacket, bytesRead);
            } catch (Exception e) {
                logError("Unexpected exception: ", e);
                logError("handlePacket error: ", e);
                break;
            }
        }

        stop();
        onExit();
        return false;
    }

    private void unregisterAndDestroyFd() {
        if (mFd == null) return;

        mQueue.removeOnFileDescriptorEventListener(mFd);
        closeFd(mFd);
        mFd = null;
        onStop();
    }

    private boolean onCorrectThread() {
        return (mHandler.getLooper() == Looper.myLooper());
    }
}
+91 −42
Original line number Diff line number Diff line
@@ -16,8 +16,11 @@

package android.net.util;

import static android.net.util.BlockingSocketReader.DEFAULT_RECV_BUF_SIZE;
import static android.system.OsConstants.*;

import android.os.Handler;
import android.os.HandlerThread;
import android.system.ErrnoException;
import android.system.Os;
import android.system.StructTimeval;
@@ -27,6 +30,7 @@ import libcore.io.IoBridge;
import java.io.FileDescriptor;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.UncheckedIOException;
import java.net.DatagramPacket;
import java.net.DatagramSocket;
import java.net.Inet6Address;
@@ -53,20 +57,17 @@ public class BlockingSocketReaderTest extends TestCase {
    protected FileDescriptor mLocalSocket;
    protected InetSocketAddress mLocalSockName;
    protected byte[] mLastRecvBuf;
    protected boolean mExited;
    protected boolean mStopped;
    protected HandlerThread mHandlerThread;
    protected BlockingSocketReader mReceiver;

    @Override
    public void setUp() {
        resetLatch();
        mLocalSocket = null;
        mLocalSockName = null;
        mLastRecvBuf = null;
        mExited = false;
    class UdpLoopbackReader extends BlockingSocketReader {
        public UdpLoopbackReader(Handler h) {
            super(h);
        }

        mReceiver = new BlockingSocketReader() {
        @Override
            protected FileDescriptor createSocket() {
        protected FileDescriptor createFd() {
            FileDescriptor s = null;
            try {
                s = Os.socket(AF_INET6, SOCK_DGRAM, IPPROTO_UDP);
@@ -74,7 +75,7 @@ public class BlockingSocketReaderTest extends TestCase {
                mLocalSockName = (InetSocketAddress) Os.getsockname(s);
                Os.setsockoptTimeval(s, SOL_SOCKET, SO_SNDTIMEO, TIMEO);
            } catch (ErrnoException|SocketException e) {
                    closeSocket(s);
                closeFd(s);
                fail();
                return null;
            }
@@ -90,25 +91,50 @@ public class BlockingSocketReaderTest extends TestCase {
        }

        @Override
            protected void onExit() {
                mExited = true;
        protected void onStart() {
            mStopped = false;
            mLatch.countDown();
        }

        @Override
        protected void onStop() {
            mStopped = true;
            mLatch.countDown();
        }
    };

    @Override
    public void setUp() {
        resetLatch();
        mLocalSocket = null;
        mLocalSockName = null;
        mLastRecvBuf = null;
        mStopped = false;

        mHandlerThread = new HandlerThread(BlockingSocketReaderTest.class.getSimpleName());
        mHandlerThread.start();
    }

    @Override
    public void tearDown() {
        if (mReceiver != null) mReceiver.stop();
    public void tearDown() throws Exception {
        if (mReceiver != null) {
            mHandlerThread.getThreadHandler().post(() -> { mReceiver.stop(); });
            waitForActivity();
        }
        mReceiver = null;
        mHandlerThread.quit();
        mHandlerThread = null;
    }

    void resetLatch() { mLatch = new CountDownLatch(1); }

    void waitForActivity() throws Exception {
        assertTrue(mLatch.await(500, TimeUnit.MILLISECONDS));
        try {
            mLatch.await(1000, TimeUnit.MILLISECONDS);
        } finally {
            resetLatch();
        }
    }

    void sendPacket(byte[] contents) throws Exception {
        final DatagramSocket sender = new DatagramSocket();
@@ -118,31 +144,54 @@ public class BlockingSocketReaderTest extends TestCase {
    }

    public void testBasicWorking() throws Exception {
        assertTrue(mReceiver.start());
        final Handler h = mHandlerThread.getThreadHandler();
        mReceiver = new UdpLoopbackReader(h);

        h.post(() -> { mReceiver.start(); });
        waitForActivity();
        assertTrue(mLocalSockName != null);
        assertEquals(LOOPBACK6, mLocalSockName.getAddress());
        assertTrue(0 < mLocalSockName.getPort());
        assertTrue(mLocalSocket != null);
        assertFalse(mExited);
        assertFalse(mStopped);

        final byte[] one = "one 1".getBytes("UTF-8");
        sendPacket(one);
        waitForActivity();
        assertEquals(1, mReceiver.numPacketsReceived());
        assertTrue(Arrays.equals(one, mLastRecvBuf));
        assertFalse(mExited);
        assertFalse(mStopped);

        final byte[] two = "two 2".getBytes("UTF-8");
        sendPacket(two);
        waitForActivity();
        assertEquals(2, mReceiver.numPacketsReceived());
        assertTrue(Arrays.equals(two, mLastRecvBuf));
        assertFalse(mExited);
        assertFalse(mStopped);

        mReceiver.stop();
        waitForActivity();
        assertEquals(2, mReceiver.numPacketsReceived());
        assertTrue(Arrays.equals(two, mLastRecvBuf));
        assertTrue(mExited);
        assertTrue(mStopped);
        mReceiver = null;
    }

    class NullBlockingSocketReader extends BlockingSocketReader {
        public NullBlockingSocketReader(Handler h, int recvbufsize) {
            super(h, recvbufsize);
        }

        @Override
        public FileDescriptor createFd() { return null; }
    }

    public void testMinimalRecvBufSize() throws Exception {
        final Handler h = mHandlerThread.getThreadHandler();

        for (int i : new int[]{-1, 0, 1, DEFAULT_RECV_BUF_SIZE-1}) {
            final BlockingSocketReader b = new NullBlockingSocketReader(h, i);
            assertEquals(DEFAULT_RECV_BUF_SIZE, b.recvBufSize());
        }
    }
}