Donate to e Foundation | Murena handsets with /e/OS | Own a part of Murena! Learn more

Unverified Commit 5ba850c9 authored by Simon Chan's avatar Simon Chan
Browse files

refactor(stream): create more shortcut functions

parent d80ce758
Loading
Loading
Loading
Loading
+3 −4
Original line number Diff line number Diff line
import type {
    Consumable,
    WritableStream,
    WritableStreamDefaultWriter,
} from "@yume-chan/stream-extra";
import { Consumable } from "@yume-chan/stream-extra";
import { ConsumableWritableStream } from "@yume-chan/stream-extra";

import type {
    ScrcpyOptions,
@@ -55,9 +56,7 @@ export class ScrcpyControlMessageSerializer {
    }

    private async write(data: Uint8Array) {
        const output = new Consumable(data);
        await this.writer.write(output);
        await output.consumed;
        await ConsumableWritableStream.write(this.writer, data);
    }

    public injectKeyCode(
+13 −9
Original line number Diff line number Diff line
@@ -15,6 +15,7 @@ interface GlobalEx {
    console: Console;
}

// `createTask` allows browser DevTools to track the call stack across async boundaries.
const { console } = globalThis as unknown as GlobalEx;
const createTask: Console["createTask"] =
    console.createTask?.bind(console) ??
@@ -59,13 +60,20 @@ export class Consumable<T> {
    }
}

async function enqueue<T>(
    controller: { enqueue: (chunk: Consumable<T>) => void },
    chunk: T
) {
    const output = new Consumable(chunk);
    controller.enqueue(output);
    await output.consumed;
}

export class WrapConsumableStream<T> extends TransformStream<T, Consumable<T>> {
    public constructor() {
        super({
            async transform(chunk, controller) {
                const output = new Consumable(chunk);
                controller.enqueue(output);
                await output.consumed;
                await enqueue(controller, chunk);
            },
        });
    }
@@ -128,9 +136,7 @@ export class ConsumableReadableStream<T> extends ReadableStream<Consumable<T>> {
                async start(controller) {
                    wrappedController = {
                        async enqueue(chunk) {
                            const consumable = new Consumable(chunk);
                            controller.enqueue(consumable);
                            await consumable.consumed;
                            await enqueue(controller, chunk);
                        },
                        close() {
                            controller.close();
@@ -235,9 +241,7 @@ export class ConsumableTransformStream<I, O> extends TransformStream<
            async start(controller) {
                wrappedController = {
                    async enqueue(chunk) {
                        const consumable = new Consumable(chunk);
                        controller.enqueue(consumable);
                        await consumable.consumed;
                        await enqueue(controller, chunk);
                    },
                    close() {
                        controller.terminate();
+6 −6
Original line number Diff line number Diff line
import { describe, expect, it, jest } from "@jest/globals";

import { Consumable, ConsumableWritableStream } from "./consumable.js";
import {
    ConsumableReadableStream,
    ConsumableWritableStream,
} from "./consumable.js";
import { DistributionStream } from "./distribution.js";
import { ReadableStream } from "./stream.js";

const TestData = new Uint8Array(50);
for (let i = 0; i < 50; i += 1) {
@@ -17,14 +19,12 @@ async function testInputOutput(
    const write = jest.fn((chunk: Uint8Array) => {
        void chunk;
    });
    await new ReadableStream<Consumable<Uint8Array>>({
    await new ConsumableReadableStream<Uint8Array>({
        async start(controller) {
            let offset = 0;
            for (const length of inputLengths) {
                const end = offset + length;
                const output = new Consumable(TestData.subarray(offset, end));
                controller.enqueue(output);
                await output.consumed;
                await controller.enqueue(TestData.subarray(offset, end));
                offset = end;
            }
            controller.close();