| 9 | type SendBatchFunc = (batch: string[]) => Promise<string[]>; |
| 10 | |
| 11 | class BatchClientTransport implements RpcTransport { |
| 12 | constructor(sendBatch: SendBatchFunc) { |
| 13 | this.#promise = this.#scheduleBatch(sendBatch); |
| 14 | } |
| 15 | |
| 16 | #promise: Promise<void>; |
| 17 | #aborted: any; |
| 18 | |
| 19 | #batchToSend: string[] | null = []; |
| 20 | #batchToReceive: string[] | null = null; |
| 21 | |
| 22 | send(message: string): void { |
| 23 | // If the batch was already sent, we just ignore the message, because throwing may cause the |
| 24 | // RPC system to abort prematurely. Once the last receive() is done then we'll throw an error |
| 25 | // that aborts the RPC system at the right time and will propagate to all other requests. |
| 26 | if (this.#batchToSend !== null) { |
| 27 | this.#batchToSend.push(message); |
| 28 | } |
| 29 | } |
| 30 | |
| 31 | async receive(): Promise<string> { |
| 32 | if (!this.#batchToReceive) { |
| 33 | await this.#promise; |
| 34 | } |
| 35 | |
| 36 | let msg = this.#batchToReceive!.shift(); |
| 37 | if (msg !== undefined) { |
| 38 | return msg; |
| 39 | } else { |
| 40 | // No more messages. An error thrown here will propagate out of any calls that are still |
| 41 | // open. |
| 42 | throw new Error("Batch RPC request ended."); |
| 43 | } |
| 44 | } |
| 45 | |
| 46 | abort?(reason: any): void { |
| 47 | this.#aborted = reason; |
| 48 | } |
| 49 | |
| 50 | async #scheduleBatch(sendBatch: SendBatchFunc) { |
| 51 | // Wait for microtask queue to clear before sending a batch. |
| 52 | // |
| 53 | // Note that simply waiting for one turn of the microtask queue (await Promise.resolve()) is |
| 54 | // not good enough here as the application needs a chance to call `.then()` on every RPC |
| 55 | // promise in order to explicitly indicate they want the results. Unfortunately, `await`ing |
| 56 | // a thenable does not call `.then()` immediately -- for some reason it waits for a turn of |
| 57 | // the microtask queue first, *then* calls `.then()`. |
| 58 | await new Promise(resolve => setTimeout(resolve, 0)); |
| 59 | |
| 60 | if (this.#aborted !== undefined) { |
| 61 | throw this.#aborted; |
| 62 | } |
| 63 | |
| 64 | let batch = this.#batchToSend!; |
| 65 | this.#batchToSend = null; |
| 66 | this.#batchToReceive = await sendBatch(batch); |
| 67 | } |
| 68 | } |
nothing calls this directly
no outgoing calls
no test coverage detected
searching dependent graphs…