* Drive an async-generator transform as a `{ readable, writable }` pair. * * The pair shape (rather than a `TransformStream`) is required because we * need cancellation hooks: the `Transformer.cancel` callback is in the * WHATWG Streams spec but not yet implemented in major browsers as of 2026,
(gen, flushFn)
| 118 | * @returns {{ readable: ReadableStream<any>, writable: WritableStream<any> }} |
| 119 | */ |
| 120 | function asyncGenTransform (gen, flushFn) { |
| 121 | /** @type {any[]} */ |
| 122 | const queue = [] |
| 123 | /** @type {Array<(value?: any) => void>} */ |
| 124 | const sourceWakeups = [] |
| 125 | /** @type {Array<(value?: any) => void>} */ |
| 126 | const writeWaiters = [] |
| 127 | let sourceEnded = false |
| 128 | let flushed = false |
| 129 | |
| 130 | const QUEUE_HWM = 16 |
| 131 | |
| 132 | const wakeOne = (/** @type {Array<(value?: any) => void>} */ arr) => { |
| 133 | const w = arr.shift() |
| 134 | if (w) w() |
| 135 | } |
| 136 | const wakeAll = (/** @type {Array<(value?: any) => void>} */ arr) => { |
| 137 | while (arr.length) /** @type {(value?: any) => void} */ (arr.shift())() |
| 138 | } |
| 139 | |
| 140 | // The source iterable user code consumes via `for await (const x of source)`. |
| 141 | const source = (async function * () { |
| 142 | while (true) { |
| 143 | while (queue.length > 0) { |
| 144 | yield queue.shift() |
| 145 | wakeOne(writeWaiters) |
| 146 | } |
| 147 | if (sourceEnded) return |
| 148 | await new Promise((resolve) => { sourceWakeups.push(resolve) }) |
| 149 | } |
| 150 | })() |
| 151 | |
| 152 | // Build the user's iterator once; pull() drives it on demand. |
| 153 | const iter = gen(source) |
| 154 | |
| 155 | // Controllers captured in start() so the two sides can cross-propagate |
| 156 | // termination (matching the WHATWG TransformStream behaviour: cancel on |
| 157 | // one side errors the other). |
| 158 | /** @type {ReadableStreamDefaultController<any> | null} */ |
| 159 | let readableController = null |
| 160 | /** @type {WritableStreamDefaultController | null} */ |
| 161 | let writableController = null |
| 162 | let terminating = false |
| 163 | |
| 164 | // Shared cleanup: end the source, finalize the user's generator (running |
| 165 | // its `finally`), error the opposite side. Idempotent. |
| 166 | const finalize = async (/** @type {any} */ reason, /** @type {'readable'|'writable'} */ origin) => { |
| 167 | if (terminating) return |
| 168 | terminating = true |
| 169 | sourceEnded = true |
| 170 | wakeAll(sourceWakeups) |
| 171 | wakeAll(writeWaiters) |
| 172 | if (typeof iter.return === 'function') { |
| 173 | try { await iter.return() } catch { /* user finally threw; ignored */ } |
| 174 | } |
| 175 | if (origin === 'readable' && writableController) { |
| 176 | try { writableController.error(reason) } catch { /* already errored */ } |
| 177 | } else if (origin === 'writable' && readableController) { |