MCPcopy Index your code
hub / github.com/nodejs/node / pipeTo

Function pipeTo

lib/internal/streams/iter/pull.js:991–1134  ·  view source on GitHub ↗

* Write an async source through transforms to a writer. * @param {AsyncIterable |Iterable } source * @param {...(Function|object)} args - Transforms, writer, and optional options * @returns {Promise } Total bytes written

(source, ...args)

Source from the content-addressed store, hash-verified

989 * @returns {Promise<number>} Total bytes written
990 */
991async function pipeTo(source, ...args) {
992 const { transforms, writer, options } = parsePipeToArgs(args, 'write');
993 if (options?.signal !== undefined) {
994 validateAbortSignal(options.signal, 'options.signal');
995 }
996
997 const signal = options?.signal;
998
999 // Check for abort
1000 signal?.throwIfAborted();
1001
1002 const hasWriteSync = typeof writer.writeSync === 'function';
1003 const useSyncIterableFastPath =
1004 hasWriteSync && canUseSyncIterablePipeToFastPath(source, transforms, signal);
1005 const normalized = useSyncIterableFastPath ? undefined : from(source);
1006
1007 let totalBytes = 0;
1008 const hasWritev = typeof writer.writev === 'function';
1009 const hasWritevSync = typeof writer.writevSync === 'function';
1010 const hasEndSync = typeof writer.endSync === 'function';
1011
1012 // Async fallback for writeBatch when sync write fails partway through.
1013 // Continues writing from batch[startIndex] using async write().
1014 async function writeBatchAsyncFallback(batch, startIndex) {
1015 for (let i = startIndex; i < batch.length; i++) {
1016 const chunk = batch[i];
1017 if (hasWriteSync && writer.writeSync(chunk)) {
1018 // Sync retry succeeded
1019 } else {
1020 const result = writer.write(
1021 chunk, signal ? { __proto__: null, signal } : undefined);
1022 if (result !== undefined) {
1023 await result;
1024 }
1025 }
1026 totalBytes += TypedArrayPrototypeGetByteLength(chunk);
1027 }
1028 }
1029
1030 // Write a batch using try-fallback: sync first, async if needed.
1031 // Returns undefined on sync success, or a Promise when async fallback
1032 // is required. Callers must check: const p = writeBatch(b); if (p) await p;
1033 function writeBatch(batch) {
1034 if (hasWritev && batch.length > 1) {
1035 if (!hasWritevSync || !writer.writevSync(batch)) {
1036 const opts = signal ? { __proto__: null, signal } : undefined;
1037 const writevResult = writer.writev(batch, opts);
1038 if (writevResult === undefined) {
1039 for (let i = 0; i < batch.length; i++) {
1040 totalBytes += TypedArrayPrototypeGetByteLength(batch[i]);
1041 }
1042 return;
1043 }
1044 return PromisePrototypeThen(PromiseResolve(writevResult), () => {
1045 for (let i = 0; i < batch.length; i++) {
1046 totalBytes += TypedArrayPrototypeGetByteLength(batch[i]);
1047 }
1048 });

Callers 15

runFunction · 0.85
runFunction · 0.85
runFunction · 0.85
runFunction · 0.85
runFunction · 0.85
testBasicWriteFunction · 0.85
testPipeToWithTransformFunction · 0.85
testAsyncValidationFunction · 0.85
testPipeToAsyncSABFunction · 0.85

Calls 14

parsePipeToArgsFunction · 0.85
validateAbortSignalFunction · 0.85
isUint8ArrayBatchFunction · 0.85
writeBatchFunction · 0.85
isUint8ArrayFunction · 0.85
normalizeAsyncValueFunction · 0.85
createAsyncPipelineFunction · 0.85
wrapErrorFunction · 0.85
throwIfAbortedMethod · 0.80
fromFunction · 0.70
endSyncMethod · 0.45

Tested by

no test coverage detected

Used in the wild real call sites across dependent graphs

searching dependent graphs…