* Write an async source through transforms to a writer. * @param {AsyncIterable |Iterable } source * @param {...(Function|object)} args - Transforms, writer, and optional options * @returns {Promise } Total bytes written
(source, ...args)
| 989 | * @returns {Promise<number>} Total bytes written |
| 990 | */ |
| 991 | async function pipeTo(source, ...args) { |
| 992 | const { transforms, writer, options } = parsePipeToArgs(args, 'write'); |
| 993 | if (options?.signal !== undefined) { |
| 994 | validateAbortSignal(options.signal, 'options.signal'); |
| 995 | } |
| 996 | |
| 997 | const signal = options?.signal; |
| 998 | |
| 999 | // Check for abort |
| 1000 | signal?.throwIfAborted(); |
| 1001 | |
| 1002 | const hasWriteSync = typeof writer.writeSync === 'function'; |
| 1003 | const useSyncIterableFastPath = |
| 1004 | hasWriteSync && canUseSyncIterablePipeToFastPath(source, transforms, signal); |
| 1005 | const normalized = useSyncIterableFastPath ? undefined : from(source); |
| 1006 | |
| 1007 | let totalBytes = 0; |
| 1008 | const hasWritev = typeof writer.writev === 'function'; |
| 1009 | const hasWritevSync = typeof writer.writevSync === 'function'; |
| 1010 | const hasEndSync = typeof writer.endSync === 'function'; |
| 1011 | |
| 1012 | // Async fallback for writeBatch when sync write fails partway through. |
| 1013 | // Continues writing from batch[startIndex] using async write(). |
| 1014 | async function writeBatchAsyncFallback(batch, startIndex) { |
| 1015 | for (let i = startIndex; i < batch.length; i++) { |
| 1016 | const chunk = batch[i]; |
| 1017 | if (hasWriteSync && writer.writeSync(chunk)) { |
| 1018 | // Sync retry succeeded |
| 1019 | } else { |
| 1020 | const result = writer.write( |
| 1021 | chunk, signal ? { __proto__: null, signal } : undefined); |
| 1022 | if (result !== undefined) { |
| 1023 | await result; |
| 1024 | } |
| 1025 | } |
| 1026 | totalBytes += TypedArrayPrototypeGetByteLength(chunk); |
| 1027 | } |
| 1028 | } |
| 1029 | |
| 1030 | // Write a batch using try-fallback: sync first, async if needed. |
| 1031 | // Returns undefined on sync success, or a Promise when async fallback |
| 1032 | // is required. Callers must check: const p = writeBatch(b); if (p) await p; |
| 1033 | function writeBatch(batch) { |
| 1034 | if (hasWritev && batch.length > 1) { |
| 1035 | if (!hasWritevSync || !writer.writevSync(batch)) { |
| 1036 | const opts = signal ? { __proto__: null, signal } : undefined; |
| 1037 | const writevResult = writer.writev(batch, opts); |
| 1038 | if (writevResult === undefined) { |
| 1039 | for (let i = 0; i < batch.length; i++) { |
| 1040 | totalBytes += TypedArrayPrototypeGetByteLength(batch[i]); |
| 1041 | } |
| 1042 | return; |
| 1043 | } |
| 1044 | return PromisePrototypeThen(PromiseResolve(writevResult), () => { |
| 1045 | for (let i = 0; i < batch.length; i++) { |
| 1046 | totalBytes += TypedArrayPrototypeGetByteLength(batch[i]); |
| 1047 | } |
| 1048 | }); |
no test coverage detected
searching dependent graphs…