MCPcopy Index your code
hub / github.com/rvagg/through2 / asyncGenTransform

Function asyncGenTransform

web.js:120–239  ·  view source on GitHub ↗

* Drive an async-generator transform as a `{ readable, writable }` pair. * * The pair shape (rather than a `TransformStream`) is required because we * need cancellation hooks: the `Transformer.cancel` callback is in the * WHATWG Streams spec but not yet implemented in major browsers as of 2026,

(gen, flushFn)

Source from the content-addressed store, hash-verified

118 * @returns {{ readable: ReadableStream<any>, writable: WritableStream<any> }}
119 */
120function asyncGenTransform (gen, flushFn) {
121 /** @type {any[]} */
122 const queue = []
123 /** @type {Array<(value?: any) => void>} */
124 const sourceWakeups = []
125 /** @type {Array<(value?: any) => void>} */
126 const writeWaiters = []
127 let sourceEnded = false
128 let flushed = false
129
130 const QUEUE_HWM = 16
131
132 const wakeOne = (/** @type {Array<(value?: any) => void>} */ arr) => {
133 const w = arr.shift()
134 if (w) w()
135 }
136 const wakeAll = (/** @type {Array<(value?: any) => void>} */ arr) => {
137 while (arr.length) /** @type {(value?: any) => void} */ (arr.shift())()
138 }
139
140 // The source iterable user code consumes via `for await (const x of source)`.
141 const source = (async function * () {
142 while (true) {
143 while (queue.length > 0) {
144 yield queue.shift()
145 wakeOne(writeWaiters)
146 }
147 if (sourceEnded) return
148 await new Promise((resolve) => { sourceWakeups.push(resolve) })
149 }
150 })()
151
152 // Build the user's iterator once; pull() drives it on demand.
153 const iter = gen(source)
154
155 // Controllers captured in start() so the two sides can cross-propagate
156 // termination (matching the WHATWG TransformStream behaviour: cancel on
157 // one side errors the other).
158 /** @type {ReadableStreamDefaultController<any> | null} */
159 let readableController = null
160 /** @type {WritableStreamDefaultController | null} */
161 let writableController = null
162 let terminating = false
163
164 // Shared cleanup: end the source, finalize the user's generator (running
165 // its `finally`), error the opposite side. Idempotent.
166 const finalize = async (/** @type {any} */ reason, /** @type {'readable'|'writable'} */ origin) => {
167 if (terminating) return
168 terminating = true
169 sourceEnded = true
170 wakeAll(sourceWakeups)
171 wakeAll(writeWaiters)
172 if (typeof iter.return === 'function') {
173 try { await iter.return() } catch { /* user finally threw; ignored */ }
174 }
175 if (origin === 'readable' && writableController) {
176 try { writableController.error(reason) } catch { /* already errored */ }
177 } else if (origin === 'writable' && readableController) {

Callers 1

transformFunction · 0.85

Calls 1

wakeOneFunction · 0.70

Tested by

no test coverage detected