Donate to e Foundation | Murena handsets with /e/OS | Own a part of Murena! Learn more

Unverified Commit ac6dc1e5 authored by Simon Chan's avatar Simon Chan
Browse files

feat(adb): support delayed ack

parent 59d78dae
Loading
Loading
Loading
Loading
+15 −2
Original line number Diff line number Diff line
@@ -40,6 +40,8 @@ export interface AdbTransport extends Closeable {

    readonly disconnected: Promise<void>;

    readonly clientFeatures: readonly AdbFeature[];

    connect(service: string): ValueOrPromise<AdbSocket>;

    addReverseTunnel(
@@ -71,6 +73,14 @@ export class Adb implements Closeable {
        return this.transport.disconnected;
    }

    public get clientFeatures() {
        return this.transport.clientFeatures;
    }

    public get deviceFeatures() {
        return this.banner.features;
    }

    readonly subprocess: AdbSubprocess;
    readonly power: AdbPower;
    readonly reverse: AdbReverseCommand;
@@ -85,8 +95,11 @@ export class Adb implements Closeable {
        this.tcpip = new AdbTcpIpCommand(this);
    }

    supportsFeature(feature: AdbFeature): boolean {
        return this.banner.features.includes(feature);
    canUseFeature(feature: AdbFeature): boolean {
        return (
            this.clientFeatures.includes(feature) &&
            this.deviceFeatures.includes(feature)
        );
    }

    async createSocket(service: string): Promise<AdbSocket> {
+1 −1
Original line number Diff line number Diff line
@@ -47,7 +47,7 @@ type AdbShellProtocolPacket = StructValueType<typeof AdbShellProtocolPacket>;
 */
export class AdbSubprocessShellProtocol implements AdbSubprocessProtocol {
    static isSupported(adb: Adb) {
        return adb.supportsFeature(AdbFeature.ShellV2);
        return adb.canUseFeature(AdbFeature.ShellV2);
    }

    static async pty(adb: Adb, command: string) {
+5 −6
Original line number Diff line number Diff line
@@ -74,16 +74,15 @@ export class AdbSync extends AutoDisposable {
        this._adb = adb;
        this._socket = new AdbSyncSocket(socket, adb.maxPayloadSize);

        this.#supportsStat = adb.supportsFeature(AdbFeature.StatV2);
        this.#supportsListV2 = adb.supportsFeature(AdbFeature.ListV2);
        this.#fixedPushMkdir = adb.supportsFeature(AdbFeature.FixedPushMkdir);
        this.#supportsSendReceiveV2 = adb.supportsFeature(
        this.#supportsStat = adb.canUseFeature(AdbFeature.StatV2);
        this.#supportsListV2 = adb.canUseFeature(AdbFeature.ListV2);
        this.#fixedPushMkdir = adb.canUseFeature(AdbFeature.FixedPushMkdir);
        this.#supportsSendReceiveV2 = adb.canUseFeature(
            AdbFeature.SendReceiveV2,
        );
        // https://android.googlesource.com/platform/packages/modules/adb/+/91768a57b7138166e0a3d11f79cd55909dda7014/client/file_sync_client.cpp#1361
        this.#needPushMkdirWorkaround =
            this._adb.supportsFeature(AdbFeature.ShellV2) &&
            !this.fixedPushMkdir;
            this._adb.canUseFeature(AdbFeature.ShellV2) && !this.fixedPushMkdir;
    }

    /**
+88 −13
Original line number Diff line number Diff line
@@ -13,7 +13,7 @@ import {
    ConsumableWritableStream,
    WritableStream,
} from "@yume-chan/stream-extra";
import { EMPTY_UINT8_ARRAY } from "@yume-chan/struct";
import { EMPTY_UINT8_ARRAY, NumberFieldType } from "@yume-chan/struct";

import type { AdbIncomingSocketHandler, AdbSocket, Closeable } from "../adb.js";
import { decodeUtf8, encodeUtf8 } from "../utils/index.js";
@@ -32,6 +32,13 @@ export interface AdbPacketDispatcherOptions {
     */
    appendNullToServiceString: boolean;
    maxPayloadSize: number;
    /**
     * The number of bytes the device can send before receiving an ack packet.

     * Set to 0 or any negative value to disable delayed ack.
     * Otherwise the value must be in the range of unsigned 32-bit integer.
     */
    initialDelayedAckBytes: number;
    /**
     * Whether to preserve the connection open after the `AdbPacketDispatcher` is closed.
     */
@@ -39,6 +46,11 @@ export interface AdbPacketDispatcherOptions {
    debugSlowRead?: boolean | undefined;
}

interface SocketOpenResult {
    remoteId: number;
    availableWriteBytes: number;
}

/**
 * The dispatcher is the "dumb" part of the connection handling logic.
 *
@@ -79,6 +91,10 @@ export class AdbPacketDispatcher implements Closeable {
        options: AdbPacketDispatcherOptions,
    ) {
        this.options = options;
        // Don't allow negative values in dispatcher
        if (this.options.initialDelayedAckBytes < 0) {
            this.options.initialDelayedAckBytes = 0;
        }

        connection.readable
            .pipeTo(
@@ -169,15 +185,39 @@ export class AdbPacketDispatcher implements Closeable {
    }

    #handleOkay(packet: AdbPacketData) {
        if (this.#initializers.resolve(packet.arg1, packet.arg0)) {
        let ackBytes: number;
        if (this.options.initialDelayedAckBytes !== 0) {
            if (packet.payload.byteLength !== 4) {
                throw new Error(
                    "Invalid OKAY packet. Payload size should be 4",
                );
            }
            ackBytes = NumberFieldType.Uint32.deserialize(packet.payload, true);
        } else {
            if (packet.payload.byteLength !== 0) {
                throw new Error(
                    "Invalid OKAY packet. Payload size should be 0",
                );
            }
            ackBytes = Infinity;
        }

        if (
            this.#initializers.resolve(packet.arg1, {
                remoteId: packet.arg0,
                availableWriteBytes: ackBytes,
            } satisfies SocketOpenResult)
        ) {
            // Device successfully created the socket
            return;
        }

        const socket = this.#sockets.get(packet.arg1);
        if (socket) {
            // Device has received last `WRTE` to the socket
            socket.ack();
            // When delayed ack is enabled, device has received `ackBytes` from the socket.
            // When delayed ack is disabled, device has received last `WRTE` packet from the socket,
            // `ackBytes` is `Infinity` in this case.
            socket.ack(ackBytes);
            return;
        }

@@ -186,6 +226,18 @@ export class AdbPacketDispatcher implements Closeable {
        void this.sendPacket(AdbCommand.Close, packet.arg1, packet.arg0);
    }

    #sendOkay(localId: number, remoteId: number, ackBytes: number) {
        let payload: Uint8Array;
        if (this.options.initialDelayedAckBytes !== 0) {
            payload = new Uint8Array(4);
            new DataView(payload.buffer).setUint32(0, ackBytes, true);
        } else {
            payload = EMPTY_UINT8_ARRAY;
        }

        return this.sendPacket(AdbCommand.Okay, localId, remoteId, payload);
    }

    async #handleOpen(packet: AdbPacketData) {
        // `AsyncOperationManager` doesn't support skipping IDs
        // Use `add` + `resolve` to simulate this behavior
@@ -193,9 +245,20 @@ export class AdbPacketDispatcher implements Closeable {
        this.#initializers.resolve(localId, undefined);

        const remoteId = packet.arg0;
        let service = decodeUtf8(packet.payload);
        if (service.endsWith("\0")) {
            service = service.substring(0, service.length - 1);
        let initialDelayedAckBytes = packet.arg1;
        const service = decodeUtf8(packet.payload);

        if (this.options.initialDelayedAckBytes === 0) {
            if (initialDelayedAckBytes !== 0) {
                throw new Error("Invalid OPEN packet. arg1 should be 0");
            }
            initialDelayedAckBytes = Infinity;
        } else {
            if (initialDelayedAckBytes === 0) {
                throw new Error(
                    "Invalid OPEN packet. arg1 should be greater than 0",
                );
            }
        }

        const handler = this.#incomingSocketHandlers.get(service);
@@ -211,11 +274,16 @@ export class AdbPacketDispatcher implements Closeable {
            localCreated: false,
            service,
        });
        controller.ack(initialDelayedAckBytes);

        try {
            await handler(controller.socket);
            this.#sockets.set(localId, controller);
            await this.sendPacket(AdbCommand.Okay, localId, remoteId);
            await this.#sendOkay(
                localId,
                remoteId,
                this.options.initialDelayedAckBytes,
            );
        } catch (e) {
            await this.sendPacket(AdbCommand.Close, 0, remoteId);
        }
@@ -238,10 +306,10 @@ export class AdbPacketDispatcher implements Closeable {
            }),
            (async () => {
                await socket.enqueue(packet.payload);
                await this.sendPacket(
                    AdbCommand.Okay,
                await this.#sendOkay(
                    packet.arg1,
                    packet.arg0,
                    packet.payload.length,
                );
                handled = true;
            })(),
@@ -255,11 +323,17 @@ export class AdbPacketDispatcher implements Closeable {
            service += "\0";
        }

        const [localId, initializer] = this.#initializers.add<number>();
        await this.sendPacket(AdbCommand.Open, localId, 0, service);
        const [localId, initializer] =
            this.#initializers.add<SocketOpenResult>();
        await this.sendPacket(
            AdbCommand.Open,
            localId,
            this.options.initialDelayedAckBytes,
            service,
        );

        // Fulfilled by `handleOk`
        const remoteId = await initializer;
        const { remoteId, availableWriteBytes } = await initializer;
        const controller = new AdbDaemonSocketController({
            dispatcher: this,
            localId,
@@ -267,6 +341,7 @@ export class AdbPacketDispatcher implements Closeable {
            localCreated: true,
            service,
        });
        controller.ack(availableWriteBytes);
        this.#sockets.set(localId, controller);

        return controller.socket;
+42 −10
Original line number Diff line number Diff line
@@ -49,7 +49,6 @@ export class AdbDaemonSocketController
        return this.#readable;
    }

    #writePromise: PromiseResolver<void> | undefined;
    #writableController!: WritableStreamDefaultController;
    readonly writable: WritableStream<Consumable<Uint8Array>>;

@@ -65,6 +64,23 @@ export class AdbDaemonSocketController
        return this.#socket;
    }

    #availableWriteBytesChanged: PromiseResolver<void> | undefined;
    /**
     * When delayed ack is disabled, can be `Infinity` if the socket is ready to write.
     * Exactly one packet can be written no matter how large it is. Or `-1` if the socket
     * is waiting for ack.
     *
     * When delayed ack is enabled, a non-negative finite number indicates the number of
     * bytes that can be written to the socket before receiving an ack.
     */
    #availableWriteBytes = 0;
    /**
     * Gets the number of bytes that can be written to the socket without blocking.
     */
    public get availableWriteBytes() {
        return this.#availableWriteBytes;
    }

    constructor(options: AdbDaemonSocketConstructionOptions) {
        this.#dispatcher = options.dispatcher;
        this.localId = options.localId;
@@ -88,17 +104,30 @@ export class AdbDaemonSocketController
                    start < size;
                    start = end, end += chunkSize
                ) {
                    this.#writePromise = new PromiseResolver();
                    const chunk = data.subarray(start, end);
                    const length = chunk.byteLength;
                    while (this.#availableWriteBytes < length) {
                        // Only one lock is required because Web Streams API guarantees
                        // that `write` is not reentrant.
                        this.#availableWriteBytesChanged =
                            new PromiseResolver();
                        await raceSignal(
                            () => this.#availableWriteBytesChanged!.promise,
                            controller.signal,
                        );
                    }

                    if (this.#availableWriteBytes === Infinity) {
                        this.#availableWriteBytes = -1;
                    } else {
                        this.#availableWriteBytes -= length;
                    }

                    await this.#dispatcher.sendPacket(
                        AdbCommand.Write,
                        this.localId,
                        this.remoteId,
                        data.subarray(start, end),
                    );
                    // Wait for ack packet
                    await raceSignal(
                        () => this.#writePromise!.promise,
                        controller.signal,
                        chunk,
                    );
                }
            },
@@ -124,8 +153,9 @@ export class AdbDaemonSocketController
        }
    }

    ack() {
        this.#writePromise?.resolve();
    public ack(bytes: number) {
        this.#availableWriteBytes += bytes;
        this.#availableWriteBytesChanged?.resolve();
    }

    async close(): Promise<void> {
@@ -134,6 +164,8 @@ export class AdbDaemonSocketController
        }
        this.#closed = true;

        this.#availableWriteBytesChanged?.reject(new Error("Socket closed"));

        try {
            this.#writableController.error(new Error("Socket closed"));
        } catch {
Loading