* Create an async pipeline from source through transforms. * @yields {Uint8Array[]}
(source, transforms, signal)
| 770 | * @yields {Uint8Array[]} |
| 771 | */ |
| 772 | async function* createAsyncPipeline(source, transforms, signal) { |
| 773 | // Check for abort |
| 774 | signal?.throwIfAborted(); |
| 775 | |
| 776 | // Fast path: no transforms, just yield normalized source directly |
| 777 | if (transforms.length === 0) { |
| 778 | yield* yieldAbortable(source, signal); |
| 779 | return; |
| 780 | } |
| 781 | |
| 782 | const normalized = yieldAbortable(source, signal); |
| 783 | |
| 784 | // Create internal controller for transform cancellation. |
| 785 | // Note: if signal was already aborted, we threw above - no need to check here. |
| 786 | const controller = new AbortController(); |
| 787 | let abortHandler; |
| 788 | if (signal) { |
| 789 | abortHandler = () => { |
| 790 | controller.abort(signal.reason ?? |
| 791 | lazyDOMException('Aborted', 'AbortError')); |
| 792 | }; |
| 793 | signal.addEventListener('abort', abortHandler, { __proto__: null, once: true }); |
| 794 | } |
| 795 | |
| 796 | // Apply transforms - fuse consecutive stateless transforms into a single |
| 797 | // generator layer to avoid unnecessary async generator ticks. |
| 798 | // |
| 799 | // INVARIANT: Each transform invocation MUST receive its own fresh options |
| 800 | // object ({ __proto__: null, signal }). Transforms may mutate the options |
| 801 | // object, so sharing a single object across invocations would allow one |
| 802 | // transform to corrupt the options seen by another. The signal is shared |
| 803 | // across calls (mutations to it are acceptable), but the containing options |
| 804 | // object must be unique per call. This is enforced inside |
| 805 | // applyFusedStatelessAsyncTransforms and applyStatefulAsyncTransform, which |
| 806 | // accept the signal directly and create the options object per invocation. |
| 807 | // DO NOT pass a pre-built options object. |
| 808 | let current = normalized; |
| 809 | const transformSignal = controller.signal; |
| 810 | let statelessRun = []; |
| 811 | |
| 812 | for (let i = 0; i < transforms.length; i++) { |
| 813 | const transform = transforms[i]; |
| 814 | if (isTransformObject(transform)) { |
| 815 | // Flush any accumulated stateless run before the stateful transform |
| 816 | if (statelessRun.length > 0) { |
| 817 | current = applyFusedStatelessAsyncTransforms(current, statelessRun, |
| 818 | transformSignal); |
| 819 | statelessRun = []; |
| 820 | } |
| 821 | const opts = { __proto__: null, signal: transformSignal }; |
| 822 | if (transform[kValidatedTransform]) { |
| 823 | current = applyValidatedStatefulAsyncTransform( |
| 824 | current, transform.transform, opts); |
| 825 | } else { |
| 826 | current = applyStatefulAsyncTransform( |
| 827 | current, transform.transform, opts); |
| 828 | } |
| 829 | } else { |
no test coverage detected
searching dependent graphs…