Donate to e Foundation | Murena handsets with /e/OS | Own a part of Murena! Learn more

Unverified Commit 493683b8 authored by Simon Chan's avatar Simon Chan
Browse files

refactor(adb): simplify server client

parent 8f9b8557
Loading
Loading
Loading
Loading
+158 −138
Original line number Diff line number Diff line
@@ -12,11 +12,7 @@ import {
    MaybeConsumable,
    WrapWritableStream,
} from "@yume-chan/stream-extra";
import type {
    AsyncExactReadable,
    ExactReadable,
    ValueOrPromise,
} from "@yume-chan/struct";
import type { ValueOrPromise } from "@yume-chan/struct";
import {
    EMPTY_UINT8_ARRAY,
    SyncPromise,
@@ -94,27 +90,29 @@ function sequenceEqual(a: Uint8Array, b: Uint8Array): boolean {
const OKAY = encodeUtf8("OKAY");
const FAIL = encodeUtf8("FAIL");

export class AdbServerClient {
    static readonly VERSION = 41;
class AdbServerStream {
    #connection: AdbServerConnection;
    #buffered: BufferedReadableStream;
    #writer: WritableStreamDefaultWriter<Uint8Array>;

    readonly connection: AdbServerConnector;
    constructor(connection: AdbServerConnection) {
        this.#connection = connection;
        this.#buffered = new BufferedReadableStream(connection.readable);
        this.#writer = connection.writable.getWriter();
    }

    constructor(connection: AdbServerConnector) {
        this.connection = connection;
    readExactly(length: number): ValueOrPromise<Uint8Array> {
        return this.#buffered.readExactly(length);
    }

    static readString(stream: ExactReadable): string;
    static readString(stream: AsyncExactReadable): PromiseLike<string>;
    static readString(
        stream: ExactReadable | AsyncExactReadable,
    ): string | PromiseLike<string> {
        return SyncPromise.try(() => stream.readExactly(4))
    readString() {
        return SyncPromise.try(() => this.readExactly(4))
            .then((buffer) => {
                const length = hexToNumber(buffer);
                if (length === 0) {
                    return EMPTY_UINT8_ARRAY;
                } else {
                    return stream.readExactly(length);
                    return this.readExactly(length);
                }
            })
            .then((buffer) => {
@@ -142,87 +140,99 @@ export class AdbServerClient {
            .valueOrPromise();
    }

    static async writeString(
        writer: WritableStreamDefaultWriter<Uint8Array>,
        value: string,
    ): Promise<void> {
    async writeString(value: string): Promise<void> {
        // TODO: investigate using `encodeUtf8("0000" + value)` then modifying the length
        // That way allocates a new string (hopefully only a rope) instead of a new buffer
        const encoded = encodeUtf8(value);
        const buffer = new Uint8Array(4 + encoded.length);
        write4HexDigits(buffer, 0, encoded.length);
        buffer.set(encoded, 4);
        await writer.write(buffer);
        await this.#writer.write(buffer);
    }

    static async readOkay(
        stream: ExactReadable | AsyncExactReadable,
    ): Promise<void> {
        const response = await stream.readExactly(4);
    async readOkay(): Promise<void> {
        const response = await this.readExactly(4);
        if (sequenceEqual(response, OKAY)) {
            // `OKAY` is followed by data length and data
            // But different services want to read the data differently
            // So we don't read the data here
            return;
        }

        if (sequenceEqual(response, FAIL)) {
            const reason = await AdbServerClient.readString(stream);
            const reason = await this.readString();
            throw new Error(reason);
        }

        throw new Error(`Unexpected response: ${decodeUtf8(response)}`);
    }

    release() {
        this.#writer.releaseLock();
        return {
            readable: this.#buffered.release(),
            writable: this.#connection.writable,
            closed: this.#connection.closed,
            close: () => this.#connection.close(),
        };
    }

    async dispose() {
        await this.#buffered.cancel().catch(NOOP);
        await this.#writer.close().catch(NOOP);
        try {
            await this.#connection.close();
        } catch {
            // ignore
        }
    }
}

export class AdbServerClient {
    static readonly VERSION = 41;

    readonly connection: AdbServerConnector;

    constructor(connection: AdbServerConnector) {
        this.connection = connection;
    }

    async createConnection(
        request: string,
        options?: AdbServerConnectionOptions,
    ): Promise<AdbServerConnection> {
    ): Promise<AdbServerStream> {
        const connection = await this.connection.connect(options);
        const stream = new AdbServerStream(connection);

        try {
            const writer = connection.writable.getWriter();
            await AdbServerClient.writeString(writer, request);
            writer.releaseLock();
            await stream.writeString(request);
        } catch (e) {
            await connection.readable.cancel();
            await connection.close();
            await stream.dispose();
            throw e;
        }

        const readable = new BufferedReadableStream(connection.readable);
        try {
            // `raceSignal` throws when the signal is aborted,
            // so the `catch` block can close the connection.
            await raceSignal(
                () => AdbServerClient.readOkay(readable),
                options?.signal,
            );

            return {
                readable: readable.release(),
                writable: connection.writable,
                get closed() {
                    return connection.closed;
                },
                async close() {
                    await connection.close();
                },
            };
            await raceSignal(() => stream.readOkay(), options?.signal);
            return stream;
        } catch (e) {
            await readable.cancel().catch(NOOP);
            await connection.close();
            await stream.dispose();
            throw e;
        }
    }

    /**
     * `adb version`
     */
    async getVersion(): Promise<number> {
        const connection = await this.createConnection("host:version");
        const readable = new BufferedReadableStream(connection.readable);
        try {
            const length = hexToNumber(await readable.readExactly(4));
            const version = hexToNumber(await readable.readExactly(length));
            const length = hexToNumber(await connection.readExactly(4));
            const version = hexToNumber(await connection.readExactly(length));
            return version;
        } finally {
            connection.writable.close().catch(NOOP);
            readable.cancel().catch(NOOP);
            await connection.dispose();
        }
    }

@@ -235,79 +245,84 @@ export class AdbServerClient {
        }
    }

    /**
     * `adb kill-server`
     */
    async killServer(): Promise<void> {
        const connection = await this.createConnection("host:kill");
        connection.writable.close().catch(NOOP);
        connection.readable.cancel().catch(NOOP);
        await connection.dispose();
    }

    /**
     * `adb host-features`
     */
    async getServerFeatures(): Promise<AdbFeature[]> {
        const connection = await this.createConnection("host:host-features");
        const readable = new BufferedReadableStream(connection.readable);
        try {
            const response = await AdbServerClient.readString(readable);
            const response = await connection.readString();
            return response.split(",") as AdbFeature[];
        } finally {
            connection.writable.close().catch(NOOP);
            readable.cancel().catch(NOOP);
            await connection.dispose();
        }
    }

    async pairDevice(address: string, code: string): Promise<void> {
    /**
     * `adb pair <password> <address>`
     */
    async pairDevice(address: string, password: string): Promise<void> {
        const connection = await this.createConnection(
            `host:pair:${code}:${address}`,
            `host:pair:${password}:${address}`,
        );
        const readable = new BufferedReadableStream(connection.readable);
        try {
            const response = await readable.readExactly(4);
            const response = await connection.readExactly(4);
            // `response` is either `FAIL`, or 4 hex digits for length of the string
            if (sequenceEqual(response, FAIL)) {
                throw new Error(await AdbServerClient.readString(readable));
                throw new Error(await connection.readString());
            }
            const length = hexToNumber(response);
            // Ignore the string because it's always `Successful ...`
            await readable.readExactly(length);
            await connection.readExactly(length);
        } finally {
            connection.writable.close().catch(NOOP);
            readable.cancel().catch(NOOP);
            await connection.dispose();
        }
    }

    /**
     * `adb connect <address>`
     */
    async connectDevice(address: string): Promise<void> {
        const connection = await this.createConnection(
            `host:connect:${address}`,
        );
        const readable = new BufferedReadableStream(connection.readable);
        try {
            const response = await AdbServerClient.readString(readable);
            if (response === `already connected to ${address}`) {
            const response = await connection.readString();
            switch (response) {
                case `already connected to ${address}`:
                    throw new AdbServerClient.AlreadyConnectedError(response);
            }
            if (
                response === `failed to connect to ${address}` || // `adb pair` mode not authorized
                response === `failed to authenticate to ${address}` // `adb tcpip` mode not authorized
            ) {
                case `failed to connect to ${address}`: // `adb pair` mode not authorized
                case `failed to authenticate to ${address}`: // `adb tcpip` mode not authorized
                    throw new AdbServerClient.UnauthorizedError(response);
            }
            if (response !== `connected to ${address}`) {
                case `connected to ${address}`:
                    return;
                default:
                    throw new AdbServerClient.NetworkError(response);
            }
        } finally {
            connection.writable.close().catch(NOOP);
            readable.cancel().catch(NOOP);
            await connection.dispose();
        }
    }

    /**
     * `adb disconnect <address>`
     */
    async disconnectDevice(address: string): Promise<void> {
        const connection = await this.createConnection(
            `host:disconnect:${address}`,
        );
        const readable = new BufferedReadableStream(connection.readable);
        try {
            await AdbServerClient.readString(readable);
            await connection.readString();
        } finally {
            connection.writable.close().catch(NOOP);
            readable.cancel().catch(NOOP);
            await connection.dispose();
        }
    }

@@ -361,15 +376,16 @@ export class AdbServerClient {
        return devices;
    }

    /**
     * `adb devices -l`
     */
    async getDevices(): Promise<AdbServerDevice[]> {
        const connection = await this.createConnection("host:devices-l");
        const readable = new BufferedReadableStream(connection.readable);
        try {
            const response = await AdbServerClient.readString(readable);
            const response = await connection.readString();
            return this.parseDeviceList(response);
        } finally {
            connection.writable.close().catch(NOOP);
            readable.cancel().catch(NOOP);
            await connection.dispose();
        }
    }

@@ -384,11 +400,10 @@ export class AdbServerClient {
        signal?: AbortSignal,
    ): AsyncGenerator<AdbServerDevice[], void, void> {
        const connection = await this.createConnection("host:track-devices-l");
        const readable = new BufferedReadableStream(connection.readable);
        try {
            while (true) {
                const response = await raceSignal(
                    () => AdbServerClient.readString(readable),
                    async () => await connection.readString(),
                    signal,
                );
                const devices = this.parseDeviceList(response);
@@ -399,12 +414,7 @@ export class AdbServerClient {
                return;
            }
        } finally {
            readable.cancel().catch(NOOP);
            try {
                await connection.close();
            } catch {
                // ignore
            }
            await connection.dispose();
        }
    }

@@ -427,19 +437,19 @@ export class AdbServerClient {
        throw new Error("Invalid device selector");
    }

    /**
     * `adb -s <device> reconnect` or `adb reconnect offline`
     */
    async reconnectDevice(device: AdbServerDeviceSelector | "offline") {
        const connection = await this.createConnection(
            device === "offline"
                ? "host:reconnect-offline"
                : this.formatDeviceService(device, "reconnect"),
        );
        const readable = new BufferedReadableStream(connection.readable);
        try {
            const response = await AdbServerClient.readString(readable);
            return this.parseDeviceList(response);
            await connection.readString();
        } finally {
            connection.writable.close().catch(NOOP);
            readable.cancel().catch(NOOP);
            await connection.dispose();
        }
    }

@@ -453,24 +463,34 @@ export class AdbServerClient {
    async getDeviceFeatures(
        device: AdbServerDeviceSelector,
    ): Promise<{ transportId: bigint; features: AdbFeature[] }> {
        // Usually the client sends a device command using `connectDevice`,
        // so the command got forwarded and handled by ADB daemon.
        // However, in fact, `connectDevice` only forwards unknown services to device,
        // if the service is a host command, it will still be handled by ADB server.
        // Also, if the command is about a device, but didn't specify a selector,
        // it will be executed against the device selected previously by `connectDevice`.
        // Using this method, we can get the transport ID and device features in one connection.
        const socket = await this.createDeviceConnection(
        // On paper, `host:features` is a host service (device features are cached in host),
        // so it shouldn't use `createDeviceConnection`,
        // which is used to forward the service to the device.
        //
        // However, `createDeviceConnection` is a two step process:
        //
        //    1. Send a switch device service to host, to switch the connection to the device.
        //    2. Send the actual service to host, let it forward the service to the device.
        //
        // In step 2, the host only forward the service to device if the service is unknown to host.
        // If the service is a host service, it's still handled by host.
        //
        // Even better, if the service needs a device selector, but the selector is not provided,
        // the service will be executed against the device selected by the switch device service.
        // So we can use all device selector formats for the host service,
        // and get the transport ID in the same time.
        const connection = await this.createDeviceConnection(
            device,
            "host:features",
        );
        // Luckily `AdbServerSocket` is compatible with `AdbServerConnection`
        const stream = new AdbServerStream(connection);
        try {
            const readable = new BufferedReadableStream(socket.readable);
            const featuresString = await AdbServerClient.readString(readable);
            const featuresString = await stream.readString();
            const features = featuresString.split(",") as AdbFeature[];
            return { transportId: socket.transportId, features };
            return { transportId: connection.transportId, features };
        } finally {
            await socket.close();
            await stream.dispose();
        }
    }

@@ -506,47 +526,47 @@ export class AdbServerClient {
        const connection = await this.createConnection(switchService);

        try {
            const writer = connection.writable.getWriter();
            await AdbServerClient.writeString(writer, service);
            writer.releaseLock();
            await connection.writeString(service);
        } catch (e) {
            await connection.readable.cancel();
            await connection.close();
            await connection.dispose();
            throw e;
        }

        const readable = new BufferedReadableStream(connection.readable);
        try {
            if (transportId === undefined) {
                const array = await readable.readExactly(8);
                const array = await connection.readExactly(8);
                transportId = getUint64LittleEndian(array, 0);
            }

            await AdbServerClient.readOkay(readable);
            await connection.readOkay();

            const socket = connection.release();

            return {
                transportId,
                service,
                readable: readable.release(),
                readable: socket.readable,
                writable: new WrapWritableStream(
                    connection.writable,
                    socket.writable,
                ).bePipedThroughFrom(new MaybeConsumable.UnwrapStream()),
                get closed() {
                    return connection.closed;
                    return socket.closed;
                },
                async close() {
                    await connection.close();
                    await socket.close();
                },
            };
        } catch (e) {
            await readable.cancel().catch(NOOP);
            await connection.close();
            await connection.dispose();
            throw e;
        }
    }

    /**
     * Wait for a device to be connected or disconnected.
     *
     * `adb wait-for-<state>`
     *
     * @param device The device selector
     * @param state The state to wait for
     * @param options The options
@@ -579,12 +599,12 @@ export class AdbServerClient {
            `wait-for-${type}-${state}`,
        );

        const socket = await this.createConnection(service, options);
        const readable = new BufferedReadableStream(socket.readable);
        await AdbServerClient.readOkay(readable);

        await readable.cancel();
        await socket.close();
        const connection = await this.createConnection(service, options);
        try {
            await connection.readOkay();
        } finally {
            await connection.dispose();
        }
    }

    async createTransport(