Donate to e Foundation | Murena handsets with /e/OS | Own a part of Murena! Learn more

Commit 464ed18b authored by Jeff Sharkey's avatar Jeff Sharkey Committed by Sergey Nikolaienkov
Browse files

Initial pass at socketpair() based CDM transport.

Based on design conversations with the team, it's more robust to
rely on a CDM companion service to provide a "streaming" interface
between two devices, instead of building our own message
fragmentation and reassembly logic.  (Most transports such as
Bluetooth RFCOMM will already be doing their own fragmentation, so
we don't need to duplicate that work.)

Additionally, this change uses socketpair() for the bidirectional
data transfer between the companion service and the system_server,
avoiding additional Binder transaction load while handling its own
chunking, at the expense of dedicating two threads to "shovel" data
in each direction.

This change attempts to implement the design in the most
straightforward way possible to reduce risk; there are several
opportunities to optimize the thread usage using tools like
MessageQueue.addOnFileDescriptorEventListener(), but those will
need to wait for a future iteration.

Also includes basic functionality tests that exercise a PING/PONG
command to verify many different edge cases; they're currently all
passing.

Bug: 237030169
Test: atest CompanionTests
Change-Id: I075128fa60379f47400345b211a07d9a32391155
parent d3c9bdd0
Loading
Loading
Loading
Loading
+129 −0
Original line number Original line Diff line number Diff line
@@ -38,15 +38,22 @@ import android.content.IntentSender;
import android.content.pm.PackageManager;
import android.content.pm.PackageManager;
import android.net.MacAddress;
import android.net.MacAddress;
import android.os.Handler;
import android.os.Handler;
import android.os.ParcelFileDescriptor;
import android.os.RemoteException;
import android.os.RemoteException;
import android.os.UserHandle;
import android.os.UserHandle;
import android.service.notification.NotificationListenerService;
import android.service.notification.NotificationListenerService;
import android.util.ExceptionUtils;
import android.util.ExceptionUtils;
import android.util.Log;
import android.util.Log;
import android.util.SparseArray;


import com.android.internal.annotations.GuardedBy;
import com.android.internal.annotations.GuardedBy;
import com.android.internal.util.CollectionUtils;
import com.android.internal.util.CollectionUtils;


import libcore.io.IoUtils;

import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.util.ArrayList;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Collections;
import java.util.Iterator;
import java.util.Iterator;
@@ -273,6 +280,9 @@ public final class CompanionDeviceManager {
    @GuardedBy("mListeners")
    @GuardedBy("mListeners")
    private final ArrayList<OnAssociationsChangedListenerProxy> mListeners = new ArrayList<>();
    private final ArrayList<OnAssociationsChangedListenerProxy> mListeners = new ArrayList<>();


    @GuardedBy("mTransports")
    private final SparseArray<Transport> mTransports = new SparseArray<>();

    /** @hide */
    /** @hide */
    public CompanionDeviceManager(
    public CompanionDeviceManager(
            @Nullable ICompanionDeviceManager service, @NonNull Context context) {
            @Nullable ICompanionDeviceManager service, @NonNull Context context) {
@@ -800,6 +810,36 @@ public final class CompanionDeviceManager {
        }
        }
    }
    }


    /** {@hide} */
    public final void attachSystemDataTransport(int associationId, @NonNull InputStream in,
            @NonNull OutputStream out) throws DeviceNotAssociatedException {
        synchronized (mTransports) {
            if (mTransports.contains(associationId)) {
                detachSystemDataTransport(associationId);
            }

            try {
                final Transport transport = new Transport(associationId, in, out);
                mTransports.put(associationId, transport);
                transport.start();
            } catch (IOException e) {
                throw new RuntimeException("Failed to attach transport", e);
            }
        }
    }

    /** {@hide} */
    public final void detachSystemDataTransport(int associationId)
            throws DeviceNotAssociatedException {
        synchronized (mTransports) {
            final Transport transport = mTransports.get(associationId);
            if (transport != null) {
                mTransports.delete(associationId);
                transport.stop();
            }
        }
    }

    /**
    /**
     * Associates given device with given app for the given user directly, without UI prompt.
     * Associates given device with given app for the given user directly, without UI prompt.
     *
     *
@@ -1004,4 +1044,93 @@ public final class CompanionDeviceManager {
            mExecutor.execute(() -> mListener.onAssociationsChanged(associations));
            mExecutor.execute(() -> mListener.onAssociationsChanged(associations));
        }
        }
    }
    }

    /**
     * Representation of an active system data transport.
     * <p>
     * Internally uses two threads to shuttle bidirectional data between a
     * remote device and a {@code socketpair} that the system is listening to.
     * This design ensures that data payloads are transported efficiently
     * without adding Binder traffic contention.
     */
    private class Transport {
        private final int mAssociationId;
        private final InputStream mRemoteIn;
        private final OutputStream mRemoteOut;

        private InputStream mLocalIn;
        private OutputStream mLocalOut;

        private volatile boolean mStopped;

        public Transport(int associationId, InputStream remoteIn, OutputStream remoteOut) {
            mAssociationId = associationId;
            mRemoteIn = remoteIn;
            mRemoteOut = remoteOut;
        }

        public void start() throws IOException {
            final ParcelFileDescriptor[] pair = ParcelFileDescriptor.createSocketPair();
            mLocalIn = new ParcelFileDescriptor.AutoCloseInputStream(pair[0]);
            mLocalOut = new ParcelFileDescriptor.AutoCloseOutputStream(pair[0]);

            try {
                mService.attachSystemDataTransport(mContext.getPackageName(),
                        mContext.getUserId(), mAssociationId, pair[1]);
            } catch (RemoteException e) {
                throw new IOException("Failed to configure transport", e);
            }

            new Thread(() -> {
                try {
                    copyWithFlushing(mLocalIn, mRemoteOut);
                } catch (IOException e) {
                    if (!mStopped) {
                        Log.w(LOG_TAG, "Trouble during outgoing transport", e);
                        stop();
                    }
                }
            }).start();
            new Thread(() -> {
                try {
                    copyWithFlushing(mRemoteIn, mLocalOut);
                } catch (IOException e) {
                    if (!mStopped) {
                        Log.w(LOG_TAG, "Trouble during incoming transport", e);
                        stop();
                    }
                }
            }).start();
        }

        public void stop() {
            mStopped = true;

            IoUtils.closeQuietly(mRemoteIn);
            IoUtils.closeQuietly(mRemoteOut);
            IoUtils.closeQuietly(mLocalIn);
            IoUtils.closeQuietly(mLocalOut);

            try {
                mService.detachSystemDataTransport(mContext.getPackageName(),
                        mContext.getUserId(), mAssociationId);
            } catch (RemoteException e) {
                Log.w(LOG_TAG, "Failed to detach transport", e);
            }
        }

        /**
         * Copy all data from the first stream to the second stream, flushing
         * after every write to ensure that we quickly deliver all pending data.
         */
        private void copyWithFlushing(@NonNull InputStream in, @NonNull OutputStream out)
                throws IOException {
            byte[] buffer = new byte[8192];
            int c;
            while ((c = in.read(buffer)) != -1) {
                out.write(buffer, 0, c);
                out.flush();
            }
        }
    }
}
}
+42 −0
Original line number Original line Diff line number Diff line
@@ -23,11 +23,14 @@ import android.annotation.Nullable;
import android.annotation.RequiresPermission;
import android.annotation.RequiresPermission;
import android.annotation.TestApi;
import android.annotation.TestApi;
import android.app.Service;
import android.app.Service;
import android.bluetooth.BluetoothSocket;
import android.content.Intent;
import android.content.Intent;
import android.os.Handler;
import android.os.Handler;
import android.os.IBinder;
import android.os.IBinder;
import android.util.Log;
import android.util.Log;


import java.io.InputStream;
import java.io.OutputStream;
import java.util.Objects;
import java.util.Objects;
import java.util.concurrent.Executor;
import java.util.concurrent.Executor;


@@ -200,6 +203,45 @@ public abstract class CompanionDeviceService extends Service {
        }
        }
    }
    }


    /**
     * Attach the given bidirectional communication streams to be used for
     * transporting system data between associated devices.
     * <p>
     * The companion service providing these streams is responsible for ensuring
     * that all data is transported accurately and in-order between the two
     * devices, including any fragmentation and re-assembly when carried over a
     * size-limited transport.
     * <p>
     * As an example, it's valid to provide streams obtained from a
     * {@link BluetoothSocket} to this method, since {@link BluetoothSocket}
     * meets the API contract described above.
     *
     * @param associationId id of the associated device
     * @param in already connected stream of data incoming from remote
     *            associated device
     * @param out already connected stream of data outgoing to remote associated
     *            device
     * @hide
     */
    public final void attachSystemDataTransport(int associationId, @NonNull InputStream in,
            @NonNull OutputStream out) throws DeviceNotAssociatedException {
        getSystemService(CompanionDeviceManager.class)
                .attachSystemDataTransport(associationId, in, out);
    }

    /**
     * Detach any bidirectional communication streams previously configured
     * through {@link #attachSystemDataTransport}.
     *
     * @param associationId id of the associated device
     * @hide
     */
    public final void detachSystemDataTransport(int associationId)
            throws DeviceNotAssociatedException {
        getSystemService(CompanionDeviceManager.class)
                .detachSystemDataTransport(associationId);
    }

    /**
    /**
     * Called by system whenever a device associated with this app is connected.
     * Called by system whenever a device associated with this app is connected.
     *
     *
+4 −0
Original line number Original line Diff line number Diff line
@@ -76,4 +76,8 @@ interface ICompanionDeviceManager {
        int associationId);
        int associationId);


    void startSystemDataTransfer(String packageName, int userId, int associationId);
    void startSystemDataTransfer(String packageName, int userId, int associationId);

    void attachSystemDataTransport(String packageName, int userId, int associationId, in ParcelFileDescriptor fd);

    void detachSystemDataTransport(String packageName, int userId, int associationId);
}
}
+1 −0
Original line number Original line Diff line number Diff line
@@ -33,6 +33,7 @@ public class CompanionTestRunner extends InstrumentationTestRunner {
    public TestSuite getAllTests() {
    public TestSuite getAllTests() {
        TestSuite suite = new InstrumentationTestSuite(this);
        TestSuite suite = new InstrumentationTestSuite(this);
        suite.addTestSuite(BluetoothDeviceFilterUtilsTest.class);
        suite.addTestSuite(BluetoothDeviceFilterUtilsTest.class);
        suite.addTestSuite(SystemDataTransportTest.class);
        return suite;
        return suite;
    }
    }


+239 −0
Original line number Original line Diff line number Diff line
/*
 * Copyright (C) 2022 The Android Open Source Project
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *      http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package android.companion;

import android.os.SystemClock;
import android.test.InstrumentationTestCase;
import android.util.Log;

import com.android.internal.util.HexDump;

import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.FilterInputStream;
import java.io.FilterOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.util.Random;

public class SystemDataTransportTest extends InstrumentationTestCase {
    private static final String TAG = "SystemDataTransportTest";

    private static final int COMMAND_INVALID = 0xF00DCAFE;
    private static final int COMMAND_PING_V0 = 0x50490000;
    private static final int COMMAND_PONG_V0 = 0x504F0000;

    private CompanionDeviceManager mCdm;

    @Override
    protected void setUp() throws Exception {
        super.setUp();

        mCdm = getInstrumentation().getTargetContext()
                .getSystemService(CompanionDeviceManager.class);
    }

    public void testPingHandRolled() {
        // NOTE: These packets are explicitly hand-rolled to verify wire format;
        // the remainder of the tests are fine using generated packets

        // PING v0 with payload "HELLO WORLD!"
        final byte[] input = new byte[] {
                0x50, 0x49, 0x00, 0x00,
                0x00, 0x00, 0x00, 0x0C,
                0x48, 0x45, 0x4C, 0x4C, 0x4F, 0x20, 0x57, 0x4F, 0x52, 0x4C, 0x44, 0x21,
        };
        // PONG v0 with payload "HELLO WORLD!"
        final byte[] expected = new byte[] {
                0x50, 0x4F, 0x00, 0x00,
                0x00, 0x00, 0x00, 0x0C,
                0x48, 0x45, 0x4C, 0x4C, 0x4F, 0x20, 0x57, 0x4F, 0x52, 0x4C, 0x44, 0x21,
        };

        final ByteArrayInputStream in = new ByteArrayInputStream(input);
        final ByteArrayOutputStream out = new ByteArrayOutputStream();
        mCdm.attachSystemDataTransport(42, in, out);

        final byte[] actual = waitForByteArray(out, expected.length);
        assertEquals(HexDump.toHexString(expected), HexDump.toHexString(actual));
    }

    public void testPingTrickle() {
        final byte[] input = generatePacket(COMMAND_PING_V0, TAG);
        final byte[] expected = generatePacket(COMMAND_PONG_V0, TAG);

        final ByteArrayInputStream in = new ByteArrayInputStream(input);
        final ByteArrayOutputStream out = new ByteArrayOutputStream();
        mCdm.attachSystemDataTransport(42, new TrickleInputStream(in), out);

        final byte[] actual = waitForByteArray(out, expected.length);
        assertEquals(HexDump.toHexString(expected), HexDump.toHexString(actual));
    }

    public void testPingDelay() {
        final byte[] input = generatePacket(COMMAND_PING_V0, TAG);
        final byte[] expected = generatePacket(COMMAND_PONG_V0, TAG);

        final ByteArrayInputStream in = new ByteArrayInputStream(input);
        final ByteArrayOutputStream out = new ByteArrayOutputStream();
        mCdm.attachSystemDataTransport(42, new DelayingInputStream(in, 1000),
                new DelayingOutputStream(out, 1000));

        final byte[] actual = waitForByteArray(out, expected.length);
        assertEquals(HexDump.toHexString(expected), HexDump.toHexString(actual));
    }

    public void testPingGiant() {
        final byte[] blob = new byte[500_000];
        new Random().nextBytes(blob);

        final byte[] input = generatePacket(COMMAND_PING_V0, blob);
        final byte[] expected = generatePacket(COMMAND_PONG_V0, blob);

        final ByteArrayInputStream in = new ByteArrayInputStream(input);
        final ByteArrayOutputStream out = new ByteArrayOutputStream();
        mCdm.attachSystemDataTransport(42, in, out);

        final byte[] actual = waitForByteArray(out, expected.length);
        assertEquals(HexDump.toHexString(expected), HexDump.toHexString(actual));
    }

    public void testMutiplePingPing() {
        final byte[] input = concat(generatePacket(COMMAND_PING_V0, "red"),
                generatePacket(COMMAND_PING_V0, "green"));
        final byte[] expected = concat(generatePacket(COMMAND_PONG_V0, "red"),
                generatePacket(COMMAND_PONG_V0, "green"));

        final ByteArrayInputStream in = new ByteArrayInputStream(input);
        final ByteArrayOutputStream out = new ByteArrayOutputStream();
        mCdm.attachSystemDataTransport(42, in, out);

        final byte[] actual = waitForByteArray(out, expected.length);
        assertEquals(HexDump.toHexString(expected), HexDump.toHexString(actual));
    }

    public void testMultipleInvalidPing() {
        final byte[] input = concat(generatePacket(COMMAND_INVALID, "red"),
                generatePacket(COMMAND_PING_V0, "green"));
        final byte[] expected = generatePacket(COMMAND_PONG_V0, "green");

        final ByteArrayInputStream in = new ByteArrayInputStream(input);
        final ByteArrayOutputStream out = new ByteArrayOutputStream();
        mCdm.attachSystemDataTransport(42, in, out);

        final byte[] actual = waitForByteArray(out, expected.length);
        assertEquals(HexDump.toHexString(expected), HexDump.toHexString(actual));
    }

    public void testDoubleAttach() {
        // Connect an empty connection that is stalled out
        final InputStream in = new EmptyInputStream();
        final OutputStream out = new ByteArrayOutputStream();
        mCdm.attachSystemDataTransport(42, in, out);
        SystemClock.sleep(1000);

        // Attach a second transport that has some packets; it should disconnect
        // the first transport and start replying on the second one
        testPingHandRolled();
    }

    public static byte[] concat(byte[] a, byte[] b) {
        return ByteBuffer.allocate(a.length + b.length).put(a).put(b).array();
    }

    public static byte[] generatePacket(int command, String data) {
        return generatePacket(command, data.getBytes(StandardCharsets.UTF_8));
    }

    public static byte[] generatePacket(int command, byte[] data) {
        return ByteBuffer.allocate(data.length + 8)
                .putInt(command).putInt(data.length).put(data).array();
    }

    private static byte[] waitForByteArray(ByteArrayOutputStream out, int size) {
        int i = 0;
        while (out.size() < size) {
            SystemClock.sleep(100);
            if (i++ % 10 == 0) {
                Log.w(TAG, "Waiting for data...");
            }
            if (i > 100) {
                fail();
            }
        }
        return out.toByteArray();
    }

    private static class EmptyInputStream extends InputStream {
        @Override
        public int read() throws IOException {
            throw new UnsupportedOperationException();
        }

        @Override
        public int read(byte b[], int off, int len) throws IOException {
            // Instead of hanging indefinitely, wait a bit and claim that
            // nothing was read, without hitting EOF
            SystemClock.sleep(100);
            return 0;
        }
    }

    private static class DelayingInputStream extends FilterInputStream {
        private final long mDelay;

        public DelayingInputStream(InputStream in, long delay) {
            super(in);
            mDelay = delay;
        }

        @Override
        public int read(byte b[], int off, int len) throws IOException {
            SystemClock.sleep(mDelay);
            return super.read(b, off, len);
        }
    }

    private static class DelayingOutputStream extends FilterOutputStream {
        private final long mDelay;

        public DelayingOutputStream(OutputStream out, long delay) {
            super(out);
            mDelay = delay;
        }

        @Override
        public void write(byte b[], int off, int len) throws IOException {
            SystemClock.sleep(mDelay);
            super.write(b, off, len);
        }
    }

    private static class TrickleInputStream extends FilterInputStream {
        public TrickleInputStream(InputStream in) {
            super(in);
        }

        @Override
        public int read(byte b[], int off, int len) throws IOException {
            return super.read(b, off, 1);
        }
    }
}
Loading