MCPcopy Index your code
hub / github.com/nodejs/node / pipelineImpl

Function pipelineImpl

lib/internal/streams/pipeline.js:186–415  ·  view source on GitHub ↗
(streams, callback, opts)

Source from the content-addressed store, hash-verified

184}
185
186function pipelineImpl(streams, callback, opts) {
187 if (streams.length === 1 && ArrayIsArray(streams[0])) {
188 streams = streams[0];
189 }
190
191 if (streams.length < 2) {
192 throw new ERR_MISSING_ARGS('streams');
193 }
194
195 const ac = new AbortController();
196 const signal = ac.signal;
197 const outerSignal = opts?.signal;
198
199 // Need to cleanup event listeners if last stream is readable
200 // https://github.com/nodejs/node/issues/35452
201 const lastStreamCleanup = [];
202
203 validateAbortSignal(outerSignal, 'options.signal');
204
205 function abort() {
206 finishImpl(new AbortError(undefined, { cause: outerSignal?.reason }));
207 }
208
209 addAbortListener ??= require('internal/events/abort_listener').addAbortListener;
210 let disposable;
211 if (outerSignal) {
212 disposable = addAbortListener(outerSignal, abort);
213 }
214
215 let error;
216 let value;
217 const destroys = [];
218
219 let finishCount = 0;
220
221 function finish(err) {
222 finishImpl(err, --finishCount === 0);
223 }
224
225 function finishOnlyHandleError(err) {
226 finishImpl(err, false);
227 }
228
229 function finishImpl(err, final) {
230 if (err && (!error || error.code === 'ERR_STREAM_PREMATURE_CLOSE' || error.name === 'AbortError')) {
231 error = err;
232 }
233
234 if (!error && !final) {
235 return;
236 }
237
238 while (destroys.length) {
239 destroys.shift()(error);
240 }
241
242 disposable?.[SymbolDispose]();
243 ac.abort();

Callers 1

pipelineFunction · 0.85

Calls 15

validateAbortSignalFunction · 0.85
isNodeStreamFunction · 0.85
isReadableFunction · 0.85
isReadableNodeStreamFunction · 0.85
isTransformStreamFunction · 0.85
makeAsyncIterableFunction · 0.85
pumpToNodeFunction · 0.85
isWebStreamFunction · 0.85
pumpToWebFunction · 0.85
destroyerFunction · 0.70
isIterableFunction · 0.70
isReadableStreamFunction · 0.70

Tested by

no test coverage detected

Used in the wild real call sites across dependent graphs

searching dependent graphs…