Donate to e Foundation | Murena handsets with /e/OS | Own a part of Murena! Learn more

Commit 48c7d27f authored by Treehugger Robot's avatar Treehugger Robot Committed by Gerrit Code Review
Browse files

Merge "Change BlockingSocketReader to use MessageQueue fd handling"

parents 34cf8b2c f840e07f
Loading
Loading
Loading
Loading
+24 −14
Original line number Diff line number Diff line
@@ -61,11 +61,11 @@ public class ConnectivityPacketTracker {
    private static final String MARK_STOP = "--- STOP ---";

    private final String mTag;
    private final Handler mHandler;
    private final LocalLog mLog;
    private final BlockingSocketReader mPacketListener;
    private boolean mRunning;

    public ConnectivityPacketTracker(NetworkInterface netif, LocalLog log) {
    public ConnectivityPacketTracker(Handler h, NetworkInterface netif, LocalLog log) {
        final String ifname;
        final int ifindex;
        final byte[] hwaddr;
@@ -81,44 +81,40 @@ public class ConnectivityPacketTracker {
        }

        mTag = TAG + "." + ifname;
        mHandler = new Handler();
        mLog = log;
        mPacketListener = new PacketListener(ifindex, hwaddr, mtu);
        mPacketListener = new PacketListener(h, ifindex, hwaddr, mtu);
    }

    public void start() {
        mLog.log(MARK_START);
        mRunning = true;
        mPacketListener.start();
    }

    public void stop() {
        mPacketListener.stop();
        mLog.log(MARK_STOP);
        mRunning = false;
    }

    private final class PacketListener extends BlockingSocketReader {
        private final int mIfIndex;
        private final byte mHwAddr[];

        PacketListener(int ifindex, byte[] hwaddr, int mtu) {
            super(mtu);
        PacketListener(Handler h, int ifindex, byte[] hwaddr, int mtu) {
            super(h, mtu);
            mIfIndex = ifindex;
            mHwAddr = hwaddr;
        }

        @Override
        protected FileDescriptor createSocket() {
        protected FileDescriptor createFd() {
            FileDescriptor s = null;
            try {
                // TODO: Evaluate switching to SOCK_DGRAM and changing the
                // BlockingSocketReader's read() to recvfrom(), so that this
                // might work on non-ethernet-like links (via SLL).
                s = Os.socket(AF_PACKET, SOCK_RAW, 0);
                NetworkUtils.attachControlPacketFilter(s, ARPHRD_ETHER);
                Os.bind(s, new PacketSocketAddress((short) ETH_P_ALL, mIfIndex));
            } catch (ErrnoException | IOException e) {
                logError("Failed to create packet tracking socket: ", e);
                closeSocket(s);
                closeFd(s);
                return null;
            }
            return s;
@@ -135,6 +131,20 @@ public class ConnectivityPacketTracker {
                        "\n[" + new String(HexEncoding.encode(recvbuf, 0, length)) + "]");
        }

        @Override
        protected void onStart() {
            mLog.log(MARK_START);
        }

        @Override
        protected void onStop() {
            if (mRunning) {
                mLog.log(MARK_STOP);
            } else {
                mLog.log(MARK_STOP + " (packet listener stopped unexpectedly)");
            }
        }

        @Override
        protected void logError(String msg, Exception e) {
            Log.e(mTag, msg, e);
@@ -142,7 +152,7 @@ public class ConnectivityPacketTracker {
        }

        private void addLogEntry(String entry) {
            mHandler.post(() -> mLog.log(entry));
            mLog.log(entry);
        }
    }
}
+2 −1
Original line number Diff line number Diff line
@@ -1515,7 +1515,8 @@ public class IpManager extends StateMachine {

        private ConnectivityPacketTracker createPacketTracker() {
            try {
                return new ConnectivityPacketTracker(mNetworkInterface, mConnectivityPacketLog);
                return new ConnectivityPacketTracker(
                        getHandler(), mNetworkInterface, mConnectivityPacketLog);
            } catch (IllegalArgumentException e) {
                return null;
            }
+155 −61
Original line number Diff line number Diff line
@@ -16,81 +16,106 @@

package android.net.util;

import static android.os.MessageQueue.OnFileDescriptorEventListener.EVENT_INPUT;
import static android.os.MessageQueue.OnFileDescriptorEventListener.EVENT_ERROR;

import android.annotation.Nullable;
import android.os.Handler;
import android.os.Looper;
import android.os.MessageQueue;
import android.os.MessageQueue.OnFileDescriptorEventListener;
import android.system.ErrnoException;
import android.system.Os;
import android.system.OsConstants;

import libcore.io.IoBridge;
import libcore.io.IoUtils;

import java.io.FileDescriptor;
import java.io.InterruptedIOException;
import java.io.IOException;


/**
 * A thread that reads from a socket and passes the received packets to a
 * subclass's handlePacket() method.  The packet receive buffer is recycled
 * on every read call, so subclasses should make any copies they would like
 * inside their handlePacket() implementation.
 * This class encapsulates the mechanics of registering a file descriptor
 * with a thread's Looper and handling read events (and errors).
 *
 * Subclasses MUST implement createFd() and SHOULD override handlePacket().

 * Subclasses can expect a call life-cycle like the following:
 *
 *     [1] start() calls createFd() and (if all goes well) onStart()
 *
 *     [2] yield, waiting for read event or error notification:
 *
 *             [a] readPacket() && handlePacket()
 *
 *             [b] if (no error):
 *                     goto 2
 *                 else:
 *                     goto 3
 *
 * All public methods may be called from any thread.
 *     [3] stop() calls onStop() if not previously stopped
 *
 * The packet receive buffer is recycled on every read call, so subclasses
 * should make any copies they would like inside their handlePacket()
 * implementation.
 *
 * All public methods MUST only be called from the same thread with which
 * the Handler constructor argument is associated.
 *
 * TODO: rename this class to something more correctly descriptive (something
 * like [or less horrible than] FdReadEventsHandler?).
 *
 * @hide
 */
public abstract class BlockingSocketReader {
    private static final int FD_EVENTS = EVENT_INPUT | EVENT_ERROR;
    private static final int UNREGISTER_THIS_FD = 0;

    public static final int DEFAULT_RECV_BUF_SIZE = 2 * 1024;

    private final Handler mHandler;
    private final MessageQueue mQueue;
    private final byte[] mPacket;
    private final Thread mThread;
    private volatile FileDescriptor mSocket;
    private volatile boolean mRunning;
    private volatile long mPacketsReceived;

    // Make it slightly easier for subclasses to properly close a socket
    // without having to know this incantation.
    public static final void closeSocket(@Nullable FileDescriptor fd) {
        try {
            IoBridge.closeAndSignalBlockedThreads(fd);
        } catch (IOException ignored) {}
    }
    private FileDescriptor mFd;
    private long mPacketsReceived;

    protected BlockingSocketReader() {
        this(DEFAULT_RECV_BUF_SIZE);
    protected static void closeFd(FileDescriptor fd) {
        IoUtils.closeQuietly(fd);
    }

    protected BlockingSocketReader(int recvbufsize) {
        if (recvbufsize < DEFAULT_RECV_BUF_SIZE) {
            recvbufsize = DEFAULT_RECV_BUF_SIZE;
        }
        mPacket = new byte[recvbufsize];
        mThread = new Thread(() -> { mainLoop(); });
    protected BlockingSocketReader(Handler h) {
        this(h, DEFAULT_RECV_BUF_SIZE);
    }

    public final boolean start() {
        if (mSocket != null) return false;

        try {
            mSocket = createSocket();
        } catch (Exception e) {
            logError("Failed to create socket: ", e);
            return false;
    protected BlockingSocketReader(Handler h, int recvbufsize) {
        mHandler = h;
        mQueue = mHandler.getLooper().getQueue();
        mPacket = new byte[Math.max(recvbufsize, DEFAULT_RECV_BUF_SIZE)];
    }

        if (mSocket == null) return false;

        mRunning = true;
        mThread.start();
        return true;
    public final void start() {
        if (onCorrectThread()) {
            createAndRegisterFd();
        } else {
            mHandler.post(() -> {
                logError("start() called from off-thread", null);
                createAndRegisterFd();
            });
        }
    }

    public final void stop() {
        mRunning = false;
        closeSocket(mSocket);
        mSocket = null;
        if (onCorrectThread()) {
            unregisterAndDestroyFd();
        } else {
            mHandler.post(() -> {
                logError("stop() called from off-thread", null);
                unregisterAndDestroyFd();
            });
        }
    }

    public final boolean isRunning() { return mRunning; }
    public final int recvBufSize() { return mPacket.length; }

    public final long numPacketsReceived() { return mPacketsReceived; }

@@ -98,11 +123,21 @@ public abstract class BlockingSocketReader {
     * Subclasses MUST create the listening socket here, including setting
     * all desired socket options, interface or address/port binding, etc.
     */
    protected abstract FileDescriptor createSocket();
    protected abstract FileDescriptor createFd();

    /**
     * Subclasses MAY override this to change the default read() implementation
     * in favour of, say, recvfrom().
     *
     * Implementations MUST return the bytes read or throw an Exception.
     */
    protected int readPacket(FileDescriptor fd, byte[] packetBuffer) throws Exception {
        return Os.read(fd, packetBuffer, 0, packetBuffer.length);
    }

    /**
     * Called by the main loop for every packet.  Any desired copies of
     * |recvbuf| should be made in here, and the underlying byte array is
     * |recvbuf| should be made in here, as the underlying byte array is
     * reused across all reads.
     */
    protected void handlePacket(byte[] recvbuf, int length) {}
@@ -113,43 +148,102 @@ public abstract class BlockingSocketReader {
    protected void logError(String msg, Exception e) {}

    /**
     * Called by the main loop just prior to exiting.
     * Called by start(), if successful, just prior to returning.
     */
    protected void onExit() {}
    protected void onStart() {}

    private final void mainLoop() {
    /**
     * Called by stop() just prior to returning.
     */
    protected void onStop() {}

    private void createAndRegisterFd() {
        if (mFd != null) return;

        try {
            mFd = createFd();
            if (mFd != null) {
                // Force the socket to be non-blocking.
                IoUtils.setBlocking(mFd, false);
            }
        } catch (Exception e) {
            logError("Failed to create socket: ", e);
            closeFd(mFd);
            mFd = null;
            return;
        }

        if (mFd == null) return;

        mQueue.addOnFileDescriptorEventListener(
                mFd,
                FD_EVENTS,
                new OnFileDescriptorEventListener() {
                    @Override
                    public int onFileDescriptorEvents(FileDescriptor fd, int events) {
                        // Always call handleInput() so read/recvfrom are given
                        // a proper chance to encounter a meaningful errno and
                        // perhaps log a useful error message.
                        if (!isRunning() || !handleInput()) {
                            unregisterAndDestroyFd();
                            return UNREGISTER_THIS_FD;
                        }
                        return FD_EVENTS;
                    }
                });
        onStart();
    }

    private boolean isRunning() { return (mFd != null) && mFd.valid(); }

    // Keep trying to read until we get EAGAIN/EWOULDBLOCK or some fatal error.
    private boolean handleInput() {
        while (isRunning()) {
            final int bytesRead;

            try {
                // Blocking read.
                // TODO: See if this can be converted to recvfrom.
                bytesRead = Os.read(mSocket, mPacket, 0, mPacket.length);
                bytesRead = readPacket(mFd, mPacket);
                if (bytesRead < 1) {
                    if (isRunning()) logError("Socket closed, exiting", null);
                    break;
                }
                mPacketsReceived++;
            } catch (ErrnoException e) {
                if (e.errno != OsConstants.EINTR) {
                    if (isRunning()) logError("read error: ", e);
                if (e.errno == OsConstants.EAGAIN) {
                    // We've read everything there is to read this time around.
                    return true;
                } else if (e.errno == OsConstants.EINTR) {
                    continue;
                } else {
                    if (isRunning()) logError("readPacket error: ", e);
                    break;
                }
                continue;
            } catch (IOException ioe) {
                if (isRunning()) logError("read error: ", ioe);
                continue;
            } catch (Exception e) {
                if (isRunning()) logError("readPacket error: ", e);
                break;
            }

            try {
                handlePacket(mPacket, bytesRead);
            } catch (Exception e) {
                logError("Unexpected exception: ", e);
                logError("handlePacket error: ", e);
                break;
            }
        }

        stop();
        onExit();
        return false;
    }

    private void unregisterAndDestroyFd() {
        if (mFd == null) return;

        mQueue.removeOnFileDescriptorEventListener(mFd);
        closeFd(mFd);
        mFd = null;
        onStop();
    }

    private boolean onCorrectThread() {
        return (mHandler.getLooper() == Looper.myLooper());
    }
}
+91 −42
Original line number Diff line number Diff line
@@ -16,8 +16,11 @@

package android.net.util;

import static android.net.util.BlockingSocketReader.DEFAULT_RECV_BUF_SIZE;
import static android.system.OsConstants.*;

import android.os.Handler;
import android.os.HandlerThread;
import android.system.ErrnoException;
import android.system.Os;
import android.system.StructTimeval;
@@ -27,6 +30,7 @@ import libcore.io.IoBridge;
import java.io.FileDescriptor;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.UncheckedIOException;
import java.net.DatagramPacket;
import java.net.DatagramSocket;
import java.net.Inet6Address;
@@ -53,20 +57,17 @@ public class BlockingSocketReaderTest extends TestCase {
    protected FileDescriptor mLocalSocket;
    protected InetSocketAddress mLocalSockName;
    protected byte[] mLastRecvBuf;
    protected boolean mExited;
    protected boolean mStopped;
    protected HandlerThread mHandlerThread;
    protected BlockingSocketReader mReceiver;

    @Override
    public void setUp() {
        resetLatch();
        mLocalSocket = null;
        mLocalSockName = null;
        mLastRecvBuf = null;
        mExited = false;
    class UdpLoopbackReader extends BlockingSocketReader {
        public UdpLoopbackReader(Handler h) {
            super(h);
        }

        mReceiver = new BlockingSocketReader() {
        @Override
            protected FileDescriptor createSocket() {
        protected FileDescriptor createFd() {
            FileDescriptor s = null;
            try {
                s = Os.socket(AF_INET6, SOCK_DGRAM, IPPROTO_UDP);
@@ -74,7 +75,7 @@ public class BlockingSocketReaderTest extends TestCase {
                mLocalSockName = (InetSocketAddress) Os.getsockname(s);
                Os.setsockoptTimeval(s, SOL_SOCKET, SO_SNDTIMEO, TIMEO);
            } catch (ErrnoException|SocketException e) {
                    closeSocket(s);
                closeFd(s);
                fail();
                return null;
            }
@@ -90,25 +91,50 @@ public class BlockingSocketReaderTest extends TestCase {
        }

        @Override
            protected void onExit() {
                mExited = true;
        protected void onStart() {
            mStopped = false;
            mLatch.countDown();
        }

        @Override
        protected void onStop() {
            mStopped = true;
            mLatch.countDown();
        }
    };

    @Override
    public void setUp() {
        resetLatch();
        mLocalSocket = null;
        mLocalSockName = null;
        mLastRecvBuf = null;
        mStopped = false;

        mHandlerThread = new HandlerThread(BlockingSocketReaderTest.class.getSimpleName());
        mHandlerThread.start();
    }

    @Override
    public void tearDown() {
        if (mReceiver != null) mReceiver.stop();
    public void tearDown() throws Exception {
        if (mReceiver != null) {
            mHandlerThread.getThreadHandler().post(() -> { mReceiver.stop(); });
            waitForActivity();
        }
        mReceiver = null;
        mHandlerThread.quit();
        mHandlerThread = null;
    }

    void resetLatch() { mLatch = new CountDownLatch(1); }

    void waitForActivity() throws Exception {
        assertTrue(mLatch.await(500, TimeUnit.MILLISECONDS));
        try {
            mLatch.await(1000, TimeUnit.MILLISECONDS);
        } finally {
            resetLatch();
        }
    }

    void sendPacket(byte[] contents) throws Exception {
        final DatagramSocket sender = new DatagramSocket();
@@ -118,31 +144,54 @@ public class BlockingSocketReaderTest extends TestCase {
    }

    public void testBasicWorking() throws Exception {
        assertTrue(mReceiver.start());
        final Handler h = mHandlerThread.getThreadHandler();
        mReceiver = new UdpLoopbackReader(h);

        h.post(() -> { mReceiver.start(); });
        waitForActivity();
        assertTrue(mLocalSockName != null);
        assertEquals(LOOPBACK6, mLocalSockName.getAddress());
        assertTrue(0 < mLocalSockName.getPort());
        assertTrue(mLocalSocket != null);
        assertFalse(mExited);
        assertFalse(mStopped);

        final byte[] one = "one 1".getBytes("UTF-8");
        sendPacket(one);
        waitForActivity();
        assertEquals(1, mReceiver.numPacketsReceived());
        assertTrue(Arrays.equals(one, mLastRecvBuf));
        assertFalse(mExited);
        assertFalse(mStopped);

        final byte[] two = "two 2".getBytes("UTF-8");
        sendPacket(two);
        waitForActivity();
        assertEquals(2, mReceiver.numPacketsReceived());
        assertTrue(Arrays.equals(two, mLastRecvBuf));
        assertFalse(mExited);
        assertFalse(mStopped);

        mReceiver.stop();
        waitForActivity();
        assertEquals(2, mReceiver.numPacketsReceived());
        assertTrue(Arrays.equals(two, mLastRecvBuf));
        assertTrue(mExited);
        assertTrue(mStopped);
        mReceiver = null;
    }

    class NullBlockingSocketReader extends BlockingSocketReader {
        public NullBlockingSocketReader(Handler h, int recvbufsize) {
            super(h, recvbufsize);
        }

        @Override
        public FileDescriptor createFd() { return null; }
    }

    public void testMinimalRecvBufSize() throws Exception {
        final Handler h = mHandlerThread.getThreadHandler();

        for (int i : new int[]{-1, 0, 1, DEFAULT_RECV_BUF_SIZE-1}) {
            final BlockingSocketReader b = new NullBlockingSocketReader(h, i);
            assertEquals(DEFAULT_RECV_BUF_SIZE, b.recvBufSize());
        }
    }
}