Donate to e Foundation | Murena handsets with /e/OS | Own a part of Murena! Learn more

Unverified Commit 190b24ad authored by Simon Chan's avatar Simon Chan
Browse files

refactor(stream): add `BufferCombiner` for more flexible uses

parent 26fe7834
Loading
Loading
Loading
Loading
+13 −58
Original line number Diff line number Diff line
import type { WritableStreamDefaultWriter } from "@yume-chan/stream-extra";
import { BufferedReadableStream, Consumable } from "@yume-chan/stream-extra";
import {
    BufferCombiner,
    BufferedReadableStream,
    Consumable,
} from "@yume-chan/stream-extra";
import type { StructAsyncDeserializeStream } from "@yume-chan/struct";

import type { AdbSocket } from "../../index.js";
@@ -10,12 +14,9 @@ export class AdbSyncSocketLocked implements StructAsyncDeserializeStream {
        Consumable<Uint8Array>
    >;
    private readonly _readable: BufferedReadableStream;
    private readonly _bufferCapacity: number;
    private readonly _socketLock: AutoResetEvent;
    private readonly _writeLock = new AutoResetEvent();
    private readonly _writeBuffer: Uint8Array;
    private _writeBufferOffset = 0;
    private _writeBufferAvailable;
    private readonly _combiner: BufferCombiner;

    public constructor(
        writer: WritableStreamDefaultWriter<Consumable<Uint8Array>>,
@@ -25,10 +26,8 @@ export class AdbSyncSocketLocked implements StructAsyncDeserializeStream {
    ) {
        this._writer = writer;
        this._readable = readable;
        this._bufferCapacity = bufferSize;
        this._socketLock = lock;
        this._writeBuffer = new Uint8Array(bufferSize);
        this._writeBufferAvailable = bufferSize;
        this._combiner = new BufferCombiner(bufferSize);
    }

    private async writeInnerStream(buffer: Uint8Array) {
@@ -40,15 +39,10 @@ export class AdbSyncSocketLocked implements StructAsyncDeserializeStream {
    public async flush() {
        try {
            await this._writeLock.wait();
            if (this._writeBufferOffset === 0) {
                return;
            const buffer = this._combiner.flush();
            if (buffer) {
                await this.writeInnerStream(buffer);
            }

            await this.writeInnerStream(
                this._writeBuffer.subarray(0, this._writeBufferOffset)
            );
            this._writeBufferOffset = 0;
            this._writeBufferAvailable = this._bufferCapacity;
        } finally {
            this._writeLock.notifyOne();
        }
@@ -57,46 +51,8 @@ export class AdbSyncSocketLocked implements StructAsyncDeserializeStream {
    public async write(data: Uint8Array) {
        try {
            await this._writeLock.wait();
            let offset = 0;
            let available = data.byteLength;
            if (this._writeBufferOffset !== 0) {
                if (available >= this._writeBufferAvailable) {
                    this._writeBuffer.set(
                        data.subarray(0, this._writeBufferAvailable),
                        this._writeBufferOffset
                    );
                    offset += this._writeBufferAvailable;
                    available -= this._writeBufferAvailable;

                    await this.writeInnerStream(this._writeBuffer);
                    this._writeBufferOffset = 0;
                    this._writeBufferAvailable = this._bufferCapacity;

                    if (available === 0) {
                        return;
                    }
                } else {
                    this._writeBuffer.set(data, this._writeBufferOffset);
                    this._writeBufferOffset += available;
                    this._writeBufferAvailable -= available;
                    return;
                }
            }

            while (available >= this._bufferCapacity) {
                const end = offset + this._bufferCapacity;
                await this.writeInnerStream(data.subarray(offset, end));
                offset = end;
                available -= this._bufferCapacity;
            }

            if (available > 0) {
                this._writeBuffer.set(
                    data.subarray(offset),
                    this._writeBufferOffset
                );
                this._writeBufferOffset += available;
                this._writeBufferAvailable -= available;
            for (const buffer of this._combiner.push(data)) {
                await this.writeInnerStream(buffer);
            }
        } finally {
            this._writeLock.notifyOne();
@@ -109,8 +65,7 @@ export class AdbSyncSocketLocked implements StructAsyncDeserializeStream {
    }

    public release(): void {
        this._writeBufferOffset = 0;
        this._writeBufferAvailable = this._bufferCapacity;
        this._combiner.flush();
        this._socketLock.notifyOne();
    }
}
+98 −60
Original line number Diff line number Diff line
import { Consumable } from "./consumable.js";
import { TransformStream } from "./stream.js";

export class DistributionStream extends TransformStream<
    Consumable<Uint8Array>,
    Consumable<Uint8Array>
> {
    public constructor(size: number, combine = false) {
        const combineBuffer = combine ? new Uint8Array(size) : undefined;
        let combineBufferOffset = 0;
        let combineBufferAvailable = size;
        super({
            async transform(chunk, controller) {
/**
 * Splits or combines buffers to specified size.
 */
export class BufferCombiner {
    private _capacity: number;
    private readonly _buffer: Uint8Array;
    private _offset: number;
    private _available: number;

    public constructor(size: number) {
        this._capacity = size;
        this._buffer = new Uint8Array(size);
        this._offset = 0;
        this._available = size;
    }

    /**
     * Pushes data to the combiner.
     * @param data The input data to be split or combined.
     * @returns
     * A generator that yields buffers of specified size.
     * It may yield the same buffer multiple times, consume the data before calling `next`.
     */
    public *push(data: Uint8Array): Generator<Uint8Array, void, void> {
        let offset = 0;
                let available = chunk.value.byteLength;
        let available = data.byteLength;

                if (combineBuffer && combineBufferOffset !== 0) {
                    if (available >= combineBufferAvailable) {
                        combineBuffer.set(
                            chunk.value.subarray(0, combineBufferAvailable),
                            combineBufferOffset
        if (this._offset !== 0) {
            if (available >= this._available) {
                this._buffer.set(
                    data.subarray(0, this._available),
                    this._offset
                );
                        offset += combineBufferAvailable;
                        available -= combineBufferAvailable;

                        const output = new Consumable(combineBuffer);
                        controller.enqueue(output);
                        await output.consumed;
                offset += this._available;
                available -= this._available;

                        combineBufferOffset = 0;
                        combineBufferAvailable = size;
                yield this._buffer;
                this._offset = 0;
                this._available = this._capacity;

                if (available === 0) {
                            chunk.consume();
                    return;
                }
            } else {
                        combineBuffer.set(chunk.value, combineBufferOffset);
                        combineBufferOffset += available;
                        combineBufferAvailable -= available;
                        chunk.consume();
                this._buffer.set(data, this._offset);
                this._offset += available;
                this._available -= available;
                return;
            }
        }

                while (available >= size) {
                    const end = offset + size;

                    const output = new Consumable(
                        chunk.value.subarray(offset, end)
                    );
                    controller.enqueue(output);
                    await output.consumed;

        while (available >= this._capacity) {
            const end = offset + this._capacity;
            yield data.subarray(offset, end);
            offset = end;
                    available -= size;
            available -= this._capacity;
        }

        if (available > 0) {
                    if (combineBuffer) {
                        combineBuffer.set(
                            chunk.value.subarray(offset),
                            combineBufferOffset
                        );
                        combineBufferOffset += available;
                        combineBufferAvailable -= available;
            this._buffer.set(data.subarray(offset), this._offset);
            this._offset += available;
            this._available -= available;
        }
    }

    public flush(): Uint8Array | undefined {
        if (this._offset === 0) {
            return undefined;
        }

        const output = this._buffer.subarray(0, this._offset);
        this._offset = 0;
        this._available = this._capacity;
        return output;
    }
}

export class DistributionStream extends TransformStream<
    Consumable<Uint8Array>,
    Consumable<Uint8Array>
> {
    public constructor(size: number, combine = false) {
        const combiner = combine ? new BufferCombiner(size) : undefined;
        super({
            async transform(chunk, controller) {
                if (combiner) {
                    for (const buffer of combiner.push(chunk.value)) {
                        const output = new Consumable(buffer);
                        controller.enqueue(output);
                        await output.consumed;
                    }
                } else {
                    const data = chunk.value;
                    let offset = 0;
                    let available = data.byteLength;
                    while (available > 0) {
                        const end = offset + size;

                        const output = new Consumable(
                            chunk.value.subarray(offset)
                            data.subarray(offset, end)
                        );
                        controller.enqueue(output);
                        await output.consumed;

                        offset = end;
                        available -= size;
                    }
                }

                chunk.consume();
            },
            async flush(controller) {
                if (combineBuffer && combineBufferOffset !== 0) {
                    const output = new Consumable(
                        combineBuffer.subarray(0, combineBufferOffset)
                    );
                if (combiner) {
                    const data = combiner.flush();
                    if (data) {
                        const output = new Consumable(data);
                        controller.enqueue(output);
                        await output.consumed;
                    }
                }
            },
        });
    }