Loading libraries/adb/src/daemon/socket.ts +13 −13 Original line number Original line Diff line number Diff line import { PromiseResolver } from "@yume-chan/async"; import { PromiseResolver } from "@yume-chan/async"; import type { Disposable } from "@yume-chan/event"; import type { Disposable } from "@yume-chan/event"; import type { import type { AbortSignal, PushReadableStreamController, PushReadableStreamController, ReadableStream, ReadableStream, WritableStream, WritableStream, Loading Loading @@ -67,12 +66,12 @@ export class AdbDaemonSocketController #availableWriteBytesChanged: PromiseResolver<void> | undefined; #availableWriteBytesChanged: PromiseResolver<void> | undefined; /** /** * When delayed ack is disabled, can be `Infinity` if the socket is ready to write. * When delayed ack is disabled, returns `Infinity` if the socket is ready to write * Exactly one packet can be written no matter how large it is. Or `-1` if the socket * (exactly one packet can be written no matter how large it is), or `-1` if the socket * is waiting for ack. * is waiting for ack message. * * * When delayed ack is enabled, a non-negative finite number indicates the number of * When delayed ack is enabled, returns a non-negative finite number indicates the number of * bytes that can be written to the socket before receiving an ack. * bytes that can be written to the socket before waiting for ack message. */ */ #availableWriteBytes = 0; #availableWriteBytes = 0; Loading @@ -90,8 +89,13 @@ export class AdbDaemonSocketController this.writable = new MaybeConsumable.WritableStream<Uint8Array>({ this.writable = new MaybeConsumable.WritableStream<Uint8Array>({ start: (controller) => { start: (controller) => { this.#writableController = controller; this.#writableController = controller; controller.signal.addEventListener("abort", () => { this.#availableWriteBytesChanged?.reject( controller.signal.reason, ); }); }, }, write: async (data, controller) => { write: async (data) => { const size = data.length; const size = data.length; const chunkSize = this.#dispatcher.options.maxPayloadSize; const chunkSize = this.#dispatcher.options.maxPayloadSize; for ( for ( Loading @@ -100,7 +104,7 @@ export class AdbDaemonSocketController start = end, end += chunkSize start = end, end += chunkSize ) { ) { const chunk = data.subarray(start, end); const chunk = data.subarray(start, end); await this.#writeChunk(chunk, controller.signal); await this.#writeChunk(chunk); } } }, }, }); }); Loading @@ -109,16 +113,12 @@ export class AdbDaemonSocketController this.#availableWriteBytes = options.availableWriteBytes; this.#availableWriteBytes = options.availableWriteBytes; } } async #writeChunk(data: Uint8Array, signal: AbortSignal) { async #writeChunk(data: Uint8Array) { const length = data.length; const length = data.length; while (this.#availableWriteBytes < length) { while (this.#availableWriteBytes < length) { // Only one lock is required because Web Streams API guarantees // Only one lock is required because Web Streams API guarantees // that `write` is not reentrant. // that `write` is not reentrant. const resolver = new PromiseResolver<void>(); const resolver = new PromiseResolver<void>(); signal.addEventListener("abort", () => { resolver.reject(signal.reason); }); this.#availableWriteBytesChanged = resolver; this.#availableWriteBytesChanged = resolver; await resolver.promise; await resolver.promise; } } Loading Loading
libraries/adb/src/daemon/socket.ts +13 −13 Original line number Original line Diff line number Diff line import { PromiseResolver } from "@yume-chan/async"; import { PromiseResolver } from "@yume-chan/async"; import type { Disposable } from "@yume-chan/event"; import type { Disposable } from "@yume-chan/event"; import type { import type { AbortSignal, PushReadableStreamController, PushReadableStreamController, ReadableStream, ReadableStream, WritableStream, WritableStream, Loading Loading @@ -67,12 +66,12 @@ export class AdbDaemonSocketController #availableWriteBytesChanged: PromiseResolver<void> | undefined; #availableWriteBytesChanged: PromiseResolver<void> | undefined; /** /** * When delayed ack is disabled, can be `Infinity` if the socket is ready to write. * When delayed ack is disabled, returns `Infinity` if the socket is ready to write * Exactly one packet can be written no matter how large it is. Or `-1` if the socket * (exactly one packet can be written no matter how large it is), or `-1` if the socket * is waiting for ack. * is waiting for ack message. * * * When delayed ack is enabled, a non-negative finite number indicates the number of * When delayed ack is enabled, returns a non-negative finite number indicates the number of * bytes that can be written to the socket before receiving an ack. * bytes that can be written to the socket before waiting for ack message. */ */ #availableWriteBytes = 0; #availableWriteBytes = 0; Loading @@ -90,8 +89,13 @@ export class AdbDaemonSocketController this.writable = new MaybeConsumable.WritableStream<Uint8Array>({ this.writable = new MaybeConsumable.WritableStream<Uint8Array>({ start: (controller) => { start: (controller) => { this.#writableController = controller; this.#writableController = controller; controller.signal.addEventListener("abort", () => { this.#availableWriteBytesChanged?.reject( controller.signal.reason, ); }); }, }, write: async (data, controller) => { write: async (data) => { const size = data.length; const size = data.length; const chunkSize = this.#dispatcher.options.maxPayloadSize; const chunkSize = this.#dispatcher.options.maxPayloadSize; for ( for ( Loading @@ -100,7 +104,7 @@ export class AdbDaemonSocketController start = end, end += chunkSize start = end, end += chunkSize ) { ) { const chunk = data.subarray(start, end); const chunk = data.subarray(start, end); await this.#writeChunk(chunk, controller.signal); await this.#writeChunk(chunk); } } }, }, }); }); Loading @@ -109,16 +113,12 @@ export class AdbDaemonSocketController this.#availableWriteBytes = options.availableWriteBytes; this.#availableWriteBytes = options.availableWriteBytes; } } async #writeChunk(data: Uint8Array, signal: AbortSignal) { async #writeChunk(data: Uint8Array) { const length = data.length; const length = data.length; while (this.#availableWriteBytes < length) { while (this.#availableWriteBytes < length) { // Only one lock is required because Web Streams API guarantees // Only one lock is required because Web Streams API guarantees // that `write` is not reentrant. // that `write` is not reentrant. const resolver = new PromiseResolver<void>(); const resolver = new PromiseResolver<void>(); signal.addEventListener("abort", () => { resolver.reject(signal.reason); }); this.#availableWriteBytesChanged = resolver; this.#availableWriteBytesChanged = resolver; await resolver.promise; await resolver.promise; } } Loading