Donate to e Foundation | Murena handsets with /e/OS | Own a part of Murena! Learn more

Unverified Commit d80ce758 authored by Simon Chan's avatar Simon Chan
Browse files

refactor(stream): build infra around consumable

parent b69e7137
Loading
Loading
Loading
Loading
+4 −3
Original line number Diff line number Diff line
@@ -17,6 +17,7 @@ import {
import AdbWsBackend from "@yume-chan/adb-backend-ws";
import AdbWebCredentialStore from "@yume-chan/adb-credential-web";
import {
    Consumable,
    InspectStream,
    ReadableStream,
    WritableStream,
@@ -181,7 +182,7 @@ function _Connect(): JSX.Element | null {
        setConnecting(true);

        let readable: ReadableStream<AdbPacketData>;
        let writable: WritableStream<AdbPacketInit>;
        let writable: WritableStream<Consumable<AdbPacketInit>>;
        try {
            const streams = await selectedBackend.connect();

@@ -194,8 +195,8 @@ function _Connect(): JSX.Element | null {

            writable = pipeFrom(
                streams.writable,
                new InspectStream((packet: AdbPacketInit) => {
                    GLOBAL_STATE.appendLog("out", packet);
                new InspectStream((packet: Consumable<AdbPacketInit>) => {
                    GLOBAL_STATE.appendLog("out", packet.value);
                })
            );
        } catch (e: any) {
+2 −2
Original line number Diff line number Diff line
@@ -30,7 +30,7 @@ import {
import { useConst } from "@fluentui/react-hooks";
import { getIcon } from "@fluentui/style-utilities";
import { AdbFeature, LinuxFileType, type AdbSyncEntry } from "@yume-chan/adb";
import { ConsumableStream } from "@yume-chan/stream-extra";
import { WrapConsumableStream } from "@yume-chan/stream-extra";
import {
    action,
    autorun,
@@ -576,7 +576,7 @@ class FileManagerState {
                await sync.write({
                    filename: itemPath,
                    file: createFileStream(file)
                        .pipeThrough(new ConsumableStream())
                        .pipeThrough(new WrapConsumableStream())
                        .pipeThrough(
                            new ProgressStream(
                                action((uploaded) => {
+2 −2
Original line number Diff line number Diff line
@@ -8,7 +8,7 @@ import {
    PackageManager,
    PackageManagerInstallOptions,
} from "@yume-chan/android-bin";
import { ConsumableStream, WritableStream } from "@yume-chan/stream-extra";
import { WrapConsumableStream, WritableStream } from "@yume-chan/stream-extra";
import { action, makeAutoObservable, observable, runInAction } from "mobx";
import { observer } from "mobx-react-lite";
import { NextPage } from "next";
@@ -83,7 +83,7 @@ class InstallPageState {
        const log = await pm.installStream(
            file.size,
            createFileStream(file)
                .pipeThrough(new ConsumableStream())
                .pipeThrough(new WrapConsumableStream())
                .pipeThrough(
                    new ProgressStream(
                        action((uploaded) => {
+30 −28
Original line number Diff line number Diff line
@@ -3,45 +3,52 @@ import { AdbPacket, AdbPacketSerializeStream } from "@yume-chan/adb";
import type { ReadableStream, WritableStream } from "@yume-chan/stream-extra";
import {
    StructDeserializeStream,
    UnwrapConsumableStream,
    WrapReadableStream,
    WrapWritableStream,
    pipeFrom,
} from "@yume-chan/stream-extra";

declare global {
    interface TCPSocket {
        close(): Promise<void>;
    interface TCPSocketOpenInfo {
        readable: ReadableStream<Uint8Array>;
        writable: WritableStream<Uint8Array>;

        remoteAddress: string;
        remotePort: number;

        readonly remoteAddress: string;
        readonly remotePort: number;
        readonly readable: ReadableStream<Uint8Array>;
        readonly writable: WritableStream<BufferSource>;
        localAddress: string;
        localPort: number;
    }

    interface SocketOptions {
        localAddress?: string | undefined;
        localPort?: number | undefined;
    class TCPSocket {
        constructor(
            remoteAddress: string,
            remotePort: number,
            options?: TCPSocketOptions
        );

        remoteAddress: string;
        remotePort: number;
        opened: Promise<TCPSocketOpenInfo>;
        closed: Promise<void>;

        close(): Promise<void>;
    }

    interface TCPSocketOptions {
        sendBufferSize?: number;
        receiveBufferSize?: number;

        keepAlive?: number;
        noDelay?: boolean;
        keepAliveDelay?: number;
    }

    interface Navigator {
        openTCPSocket(options?: SocketOptions): Promise<TCPSocket>;
    interface Window {
        TCPSocket: typeof TCPSocket;
    }
}

export default class AdbDirectSocketsBackend implements AdbBackend {
    public static isSupported(): boolean {
        return (
            typeof window !== "undefined" && !!window.navigator?.openTCPSocket
        );
        return typeof window !== "undefined" && !!window.TCPSocket;
    }

    public readonly serial: string;
@@ -60,21 +67,16 @@ export default class AdbDirectSocketsBackend implements AdbBackend {
    }

    public async connect() {
        const { readable, writable } = await navigator.openTCPSocket({
            remoteAddress: this.host,
            remotePort: this.port,
            noDelay: true,
        });
        const socket = new TCPSocket(this.host, this.port, { noDelay: true });
        const { readable, writable } = await socket.opened;

        // Native streams can't `pipeTo()` or `pipeThrough()` polyfilled streams, so we need to wrap them
        return {
            readable: new WrapReadableStream(readable).pipeThrough(
                new StructDeserializeStream(AdbPacket)
            ),
            writable: pipeFrom(
                new WrapWritableStream(writable),
                new AdbPacketSerializeStream()
            ),
            writable: new WrapWritableStream(writable)
                .bePipedThroughFrom(new UnwrapConsumableStream())
                .bePipedThroughFrom(new AdbPacketSerializeStream()),
        };
    }
}
+14 −7
Original line number Diff line number Diff line
import type { AdbBackend, AdbPacketData, AdbPacketInit } from "@yume-chan/adb";
import { AdbPacketHeader, AdbPacketSerializeStream } from "@yume-chan/adb";
import type { ReadableWritablePair } from "@yume-chan/stream-extra";
import type {
    Consumable,
    ReadableWritablePair,
    WritableStream,
} from "@yume-chan/stream-extra";
import {
    ConsumableWritableStream,
    DuplexStreamFactory,
    ReadableStream,
    WritableStream,
    pipeFrom,
} from "@yume-chan/stream-extra";
import type { StructDeserializeStream } from "@yume-chan/struct";
@@ -114,14 +118,14 @@ class Uint8ArrayStructDeserializeStream implements StructDeserializeStream {
}

export class AdbWebUsbBackendStream
    implements ReadableWritablePair<AdbPacketData, AdbPacketInit>
    implements ReadableWritablePair<AdbPacketData, Consumable<AdbPacketInit>>
{
    private _readable: ReadableStream<AdbPacketData>;
    public get readable() {
        return this._readable;
    }

    private _writable: WritableStream<AdbPacketInit>;
    private _writable: WritableStream<Consumable<AdbPacketInit>>;
    public get writable() {
        return this._writable;
    }
@@ -134,7 +138,10 @@ export class AdbWebUsbBackendStream
    ) {
        let closed = false;

        const factory = new DuplexStreamFactory<AdbPacketData, Uint8Array>({
        const factory = new DuplexStreamFactory<
            AdbPacketData,
            Consumable<Uint8Array>
        >({
            close: async () => {
                try {
                    closed = true;
@@ -203,7 +210,7 @@ export class AdbWebUsbBackendStream
        const zeroMask = outEndpoint.packetSize - 1;
        this._writable = pipeFrom(
            factory.createWritable(
                new WritableStream({
                new ConsumableWritableStream({
                    write: async (chunk) => {
                        try {
                            await device.transferOut(
@@ -272,7 +279,7 @@ export class AdbWebUsbBackend implements AdbBackend {
     * @returns The pair of `AdbPacket` streams.
     */
    public async connect(): Promise<
        ReadableWritablePair<AdbPacketData, AdbPacketInit>
        ReadableWritablePair<AdbPacketData, Consumable<AdbPacketInit>>
    > {
        if (!this._device.opened) {
            await this._device.open();
Loading