Donate to e Foundation | Murena handsets with /e/OS | Own a part of Murena! Learn more

Unverified Commit 6ae5f38a authored by Simon Chan's avatar Simon Chan
Browse files

feat(stream-extra): add an `error` handler to `WrapReadableStream`

parent 3e280199
Loading
Loading
Loading
Loading
+18 −12
Original line number Diff line number Diff line
@@ -13,8 +13,9 @@ export type WrapReadableStreamStart<T> = (

export interface ReadableStreamWrapper<T> {
    start: WrapReadableStreamStart<T>;
    cancel?(reason?: unknown): MaybePromiseLike<void>;
    close?(): MaybePromiseLike<void>;
    cancel?: (reason?: unknown) => MaybePromiseLike<void>;
    close?: () => MaybePromiseLike<void>;
    error?: (reason?: unknown) => MaybePromiseLike<void>;
}

function getWrappedReadableStream<T>(
@@ -57,27 +58,32 @@ export class WrapReadableStream<T> extends ReadableStream<T> {
        super(
            {
                start: async (controller) => {
                    // `start` is invoked before `ReadableStream`'s constructor finish,
                    // so using `this` synchronously causes
                    // "Must call super constructor in derived class before accessing 'this' or returning from derived constructor".
                    // Queue a microtask to avoid this.
                    await Promise.resolve();

                    this.readable = await getWrappedReadableStream(
                    const readable = await getWrappedReadableStream(
                        wrapper,
                        controller,
                    );
                    // `start` is called in `super()`, so can't use `this` synchronously.
                    // but it's fine after the first `await`
                    this.readable = readable;
                    this.#reader = this.readable.getReader();
                },
                pull: async (controller) => {
                    const result = await this.#reader.read();
                    if (result.done) {
                    const { done, value } = await this.#reader
                        .read()
                        .catch((e) => {
                            if ("error" in wrapper) {
                                wrapper.error(e);
                            }
                            throw e;
                        });

                    if (done) {
                        controller.close();
                        if ("close" in wrapper) {
                            await wrapper.close?.();
                        }
                    } else {
                        controller.enqueue(result.value);
                        controller.enqueue(value);
                    }
                },
                cancel: async (reason) => {
+4 −7
Original line number Diff line number Diff line
@@ -42,13 +42,10 @@ export class WrapWritableStream<T> extends WritableStream<T> {
    ) {
        super({
            start: async () => {
                // `start` is invoked before `ReadableStream`'s constructor finish,
                // so using `this` synchronously causes
                // "Must call super constructor in derived class before accessing 'this' or returning from derived constructor".
                // Queue a microtask to avoid this.
                await Promise.resolve();

                this.writable = await getWrappedWritableStream(start);
                const writable = await getWrappedWritableStream(start);
                // `start` is called in `super()`, so can't use `this` synchronously.
                // but it's fine after the first `await`
                this.writable = writable;
                this.#writer = this.writable.getWriter();
            },
            write: async (chunk) => {