MCPcopy Index your code
hub / github.com/nodejs/node / createAsyncPipeline

Function createAsyncPipeline

lib/internal/streams/iter/pull.js:772–862  ·  view source on GitHub ↗

* Create an async pipeline from source through transforms. * @yields {Uint8Array[]}

(source, transforms, signal)

Source from the content-addressed store, hash-verified

770 * @yields {Uint8Array[]}
771 */
772async function* createAsyncPipeline(source, transforms, signal) {
773 // Check for abort
774 signal?.throwIfAborted();
775
776 // Fast path: no transforms, just yield normalized source directly
777 if (transforms.length === 0) {
778 yield* yieldAbortable(source, signal);
779 return;
780 }
781
782 const normalized = yieldAbortable(source, signal);
783
784 // Create internal controller for transform cancellation.
785 // Note: if signal was already aborted, we threw above - no need to check here.
786 const controller = new AbortController();
787 let abortHandler;
788 if (signal) {
789 abortHandler = () => {
790 controller.abort(signal.reason ??
791 lazyDOMException('Aborted', 'AbortError'));
792 };
793 signal.addEventListener('abort', abortHandler, { __proto__: null, once: true });
794 }
795
796 // Apply transforms - fuse consecutive stateless transforms into a single
797 // generator layer to avoid unnecessary async generator ticks.
798 //
799 // INVARIANT: Each transform invocation MUST receive its own fresh options
800 // object ({ __proto__: null, signal }). Transforms may mutate the options
801 // object, so sharing a single object across invocations would allow one
802 // transform to corrupt the options seen by another. The signal is shared
803 // across calls (mutations to it are acceptable), but the containing options
804 // object must be unique per call. This is enforced inside
805 // applyFusedStatelessAsyncTransforms and applyStatefulAsyncTransform, which
806 // accept the signal directly and create the options object per invocation.
807 // DO NOT pass a pre-built options object.
808 let current = normalized;
809 const transformSignal = controller.signal;
810 let statelessRun = [];
811
812 for (let i = 0; i < transforms.length; i++) {
813 const transform = transforms[i];
814 if (isTransformObject(transform)) {
815 // Flush any accumulated stateless run before the stateful transform
816 if (statelessRun.length > 0) {
817 current = applyFusedStatelessAsyncTransforms(current, statelessRun,
818 transformSignal);
819 statelessRun = [];
820 }
821 const opts = { __proto__: null, signal: transformSignal };
822 if (transform[kValidatedTransform]) {
823 current = applyValidatedStatefulAsyncTransform(
824 current, transform.transform, opts);
825 } else {
826 current = applyStatefulAsyncTransform(
827 current, transform.transform, opts);
828 }
829 } else {

Callers 2

[SymbolAsyncIterator]Function · 0.85
pipeToFunction · 0.85

Calls 11

abortMethod · 0.95
yieldAbortableFunction · 0.85
lazyDOMExceptionFunction · 0.85
isTransformObjectFunction · 0.85
wrapErrorFunction · 0.85
throwIfAbortedMethod · 0.80
addEventListenerMethod · 0.65
removeEventListenerMethod · 0.65

Tested by

no test coverage detected

Used in the wild real call sites across dependent graphs

searching dependent graphs…