Donate to e Foundation | Murena handsets with /e/OS | Own a part of Murena! Learn more

Commit 57fc9914 authored by Simon Chan's avatar Simon Chan
Browse files

feat(adb): allow remote error to stop sync push

parent 4563a41c
Loading
Loading
Loading
Loading
+12 −0
Original line number Diff line number Diff line
@@ -570,6 +570,8 @@ class FileManagerState {
            );

            try {
                const start = Date.now();

                await sync.write({
                    filename: itemPath,
                    file: createFileStream(file).pipeThrough(
@@ -583,6 +585,16 @@ class FileManagerState {
                    mtime: file.lastModified / 1000,
                });

                console.log(
                    "Upload speed:",
                    (
                        ((file.size / (Date.now() - start)) * 1000) /
                        1024 /
                        1024
                    ).toFixed(2),
                    "MB/s"
                );

                runInAction(() => {
                    this.uploadSpeed =
                        this.uploadedSize - this.debouncedUploadedSize;
+13 −1
Original line number Diff line number Diff line
import { ICommandBarItemProps, Stack } from "@fluentui/react";
import { AdbFrameBuffer } from "@yume-chan/adb";
import { AdbFrameBuffer, AdbFrameBufferV2 } from "@yume-chan/adb";
import { action, autorun, computed, makeAutoObservable } from "mobx";
import { observer } from "mobx-react-lite";
import { NextPage } from "next";
@@ -47,7 +47,19 @@ const FrameBuffer: NextPage = (): JSX.Element | null => {
        }

        try {
            const start = Date.now();
            const framebuffer = await GLOBAL_STATE.device.framebuffer();
            console.log(
                "Framebuffer speed",
                (
                    (((AdbFrameBufferV2.size + framebuffer.size) /
                        (Date.now() - start)) *
                        1000) /
                    1024 /
                    1024
                ).toFixed(2),
                "MB/s"
            );
            state.setImage(framebuffer);
        } catch (e: any) {
            GLOBAL_STATE.showErrorDialog(e);
+21 −18
Original line number Diff line number Diff line
@@ -200,16 +200,26 @@ export class AdbWebUsbBackendStream
            })
        );

        const zeroMask = outEndpoint.packetSize - 1;
        this._writable = pipeFrom(
            factory.createWritable(
                new WritableStream(
                    {
                new WritableStream({
                    write: async (chunk) => {
                        try {
                            await device.transferOut(
                                outEndpoint.endpointNumber,
                                chunk
                            );

                            if (
                                zeroMask &&
                                (chunk.byteLength & zeroMask) === 0
                            ) {
                                await device.transferOut(
                                    outEndpoint.endpointNumber,
                                    EMPTY_UINT8_ARRAY
                                );
                            }
                        } catch (e) {
                            if (closed) {
                                return;
@@ -217,14 +227,7 @@ export class AdbWebUsbBackendStream
                            throw e;
                        }
                    },
                    },
                    {
                        highWaterMark: 16 * 1024,
                        size(chunk) {
                            return chunk.byteLength;
                        },
                    }
                )
                })
            ),
            new AdbPacketSerializeStream()
        );
+49 −39
Original line number Diff line number Diff line
import type { ReadableStream } from "@yume-chan/stream-extra";
import { ChunkStream, WritableStream } from "@yume-chan/stream-extra";
import {
    AbortController,
    DistributionStream,
    WritableStream,
} from "@yume-chan/stream-extra";
import Struct, { placeholder } from "@yume-chan/struct";

import { AdbSyncRequestId, adbSyncWriteRequest } from "./request.js";
import { AdbSyncResponseId, adbSyncReadResponse } from "./response.js";
import type { AdbSyncSocket } from "./socket.js";
import type { AdbSyncSocket, AdbSyncSocketLocked } from "./socket.js";
import { LinuxFileType } from "./stat.js";

const NOOP = () => {
    // no-op
};

export const ADB_SYNC_MAX_PACKET_SIZE = 64 * 1024;

export interface AdbSyncPushV1Options {
@@ -22,20 +30,17 @@ export const AdbSyncOkResponse = new Struct({ littleEndian: true }).uint32(
    "unused"
);

export async function adbSyncPushV1({
    socket,
    filename,
    file,
    mode = (LinuxFileType.File << 12) | 0o666,
    mtime = (Date.now() / 1000) | 0,
    packetSize = ADB_SYNC_MAX_PACKET_SIZE,
}: AdbSyncPushV1Options) {
    const locked = await socket.lock();
    try {
        const pathAndMode = `${filename},${mode.toString()}`;
        await adbSyncWriteRequest(locked, AdbSyncRequestId.Send, pathAndMode);

        await file.pipeThrough(new ChunkStream(packetSize, true)).pipeTo(
async function pipeFile(
    locked: AdbSyncSocketLocked,
    file: ReadableStream<Uint8Array>,
    packetSize: number,
    mtime: number
) {
    // Read and write in parallel,
    // allow error response to abort the write.
    const abortController = new AbortController();
    file.pipeThrough(new DistributionStream(packetSize, true))
        .pipeTo(
            new WritableStream({
                write: async (chunk) => {
                    await adbSyncWriteRequest(
@@ -44,15 +49,37 @@ export async function adbSyncPushV1({
                        chunk
                    );
                },
            })
        );

            }),
            { signal: abortController.signal }
        )
        .then(async () => {
            await adbSyncWriteRequest(locked, AdbSyncRequestId.Done, mtime);
            await locked.flush();
        }, NOOP);

    await adbSyncReadResponse(
        locked,
        AdbSyncResponseId.Ok,
        AdbSyncOkResponse
        );
    ).catch((e) => {
        abortController.abort();
        throw e;
    });
}

export async function adbSyncPushV1({
    socket,
    filename,
    file,
    mode = (LinuxFileType.File << 12) | 0o666,
    mtime = (Date.now() / 1000) | 0,
    packetSize = ADB_SYNC_MAX_PACKET_SIZE,
}: AdbSyncPushV1Options) {
    const locked = await socket.lock();
    try {
        const pathAndMode = `${filename},${mode.toString()}`;
        await adbSyncWriteRequest(locked, AdbSyncRequestId.Send, pathAndMode);
        await pipeFile(locked, file, packetSize, mtime);
    } finally {
        locked.release();
    }
@@ -109,24 +136,7 @@ export async function adbSyncPushV2({
            })
        );

        await file.pipeThrough(new ChunkStream(packetSize, true)).pipeTo(
            new WritableStream({
                write: async (chunk) => {
                    await adbSyncWriteRequest(
                        locked,
                        AdbSyncRequestId.Data,
                        chunk
                    );
                },
            })
        );

        await adbSyncWriteRequest(locked, AdbSyncRequestId.Done, mtime);
        await adbSyncReadResponse(
            locked,
            AdbSyncResponseId.Ok,
            AdbSyncOkResponse
        );
        await pipeFile(locked, file, packetSize, mtime);
    } finally {
        locked.release();
    }
+69 −60
Original line number Diff line number Diff line
@@ -6,12 +6,14 @@ import type { AdbSocket } from "../../index.js";
import { AutoResetEvent } from "../../index.js";

export class AdbSyncSocketLocked implements StructAsyncDeserializeStream {
    private _writer: WritableStreamDefaultWriter<Uint8Array>;
    private _readable: BufferedReadableStream;
    private _bufferSize: number;
    private _buffered: Uint8Array[] = [];
    private _bufferedLength = 0;
    private _lock: AutoResetEvent;
    private readonly _writer: WritableStreamDefaultWriter<Uint8Array>;
    private readonly _readable: BufferedReadableStream;
    private readonly _bufferCapacity: number;
    private readonly _socketLock: AutoResetEvent;
    private readonly _writeLock = new AutoResetEvent();
    private readonly _writeBuffer: Uint8Array;
    private _writeBufferOffset = 0;
    private _writeBufferAvailable;

    public constructor(
        writer: WritableStreamDefaultWriter<Uint8Array>,
@@ -21,77 +23,84 @@ export class AdbSyncSocketLocked implements StructAsyncDeserializeStream {
    ) {
        this._writer = writer;
        this._readable = readable;
        this._bufferSize = bufferSize;
        this._lock = lock;
        this._bufferCapacity = bufferSize;
        this._socketLock = lock;
        this._writeBuffer = new Uint8Array(bufferSize);
        this._writeBufferAvailable = bufferSize;
    }

    public async flush(hard: boolean) {
        if (this._bufferedLength === 0) {
    public async flush() {
        try {
            await this._writeLock.wait();
            if (this._writeBufferOffset === 0) {
                return;
            }

        if (!hard && this._bufferedLength < this._bufferSize) {
            return;
            await this._writer.write(
                this._writeBuffer.subarray(0, this._writeBufferOffset)
            );
            this._writeBufferOffset = 0;
            this._writeBufferAvailable = this._bufferCapacity;
        } finally {
            this._writeLock.notifyOne();
        }

        if (this._buffered.length === 1) {
            await this._writer.write(this._buffered[0]!);
            this._buffered.length = 0;
            this._bufferedLength = 0;
            return;
    }

        if (hard) {
            const data = new Uint8Array(this._bufferedLength);
            let offset = 0;
            for (const chunk of this._buffered) {
                data.set(chunk, offset);
                offset += chunk.byteLength;
            }
            this._buffered.length = 0;
            this._bufferedLength = 0;
            // Let AdbSocket chunk the data for us
            await this._writer.write(data);
        } else {
            while (this._bufferedLength >= this._bufferSize) {
                const data = new Uint8Array(this._bufferSize);
    public async write(data: Uint8Array) {
        try {
            await this._writeLock.wait();
            let offset = 0;
                let available = this._bufferSize;
                while (offset < this._bufferSize) {
                    const chunk = this._buffered[0]!;
                    if (chunk.byteLength <= available) {
                        data.set(chunk, offset);
                        offset += chunk.byteLength;
                        available -= chunk.byteLength;
                        this._buffered.shift();
                        this._bufferedLength -= chunk.byteLength;
            let available = data.byteLength;
            if (this._writeBufferOffset !== 0) {
                if (available >= this._writeBufferAvailable) {
                    this._writeBuffer.set(
                        data.subarray(0, this._writeBufferAvailable),
                        this._writeBufferOffset
                    );
                    offset += this._writeBufferAvailable;
                    available -= this._writeBufferAvailable;

                    await this._writer.write(this._writeBuffer);
                    this._writeBufferOffset = 0;
                    this._writeBufferAvailable = this._bufferCapacity;

                    if (available === 0) {
                        return;
                    }
                } else {
                        data.set(chunk.subarray(0, available), offset);
                        this._buffered[0] = chunk.subarray(available);
                        this._bufferedLength -= available;
                        break;
                    this._writeBuffer.set(data, this._writeBufferOffset);
                    this._writeBufferOffset += available;
                    this._writeBufferAvailable -= available;
                    return;
                }

                while (available >= this._bufferCapacity) {
                    const end = offset + this._bufferCapacity;
                    await this._writer.write(data.subarray(offset, end));
                    offset = end;
                    available -= this._bufferCapacity;
                }
                await this._writer.write(data);

                if (available > 0) {
                    this._writeBuffer.set(data.subarray(offset));
                    this._writeBufferOffset = available;
                    this._writeBufferAvailable -= available;
                }
            }
        } finally {
            this._writeLock.notifyOne();
        }

    public async write(data: Uint8Array) {
        this._buffered.push(data);
        this._bufferedLength += data.byteLength;
        await this.flush(false);
    }

    public async read(length: number) {
        await this.flush(true);
        await this.flush();
        return await this._readable.read(length);
    }

    public release(): void {
        this._buffered.length = 0;
        this._bufferedLength = 0;
        this._lock.notifyOne();
        this._writeBufferOffset = 0;
        this._writeBufferAvailable = this._bufferCapacity;
        this._socketLock.notifyOne();
    }
}

Loading