MCPcopy Index your code
hub / github.com/nodejs/node / makeZlibTransform

Function makeZlibTransform

lib/internal/streams/iter/transform.js:306–522  ·  view source on GitHub ↗
(createHandleFn, processFlag, finishFlag)

Source from the content-addressed store, hash-verified

304// so I/O and upstream work overlap with threadpool compression.
305// ---------------------------------------------------------------------------
306function makeZlibTransform(createHandleFn, processFlag, finishFlag) {
307 return {
308 __proto__: null,
309 [kValidatedTransform]: true,
310 transform: async function*(source, options) {
311 const { signal } = options;
312
313 // Fail fast if already aborted - don't allocate a native handle.
314 signal?.throwIfAborted();
315
316 // ---- Per-invocation state shared with the write callback ----
317 let outBuf;
318 let outOffset = 0;
319 let chunkSize;
320 let pending = [];
321 let pendingBytes = 0;
322
323 // Current write operation state (read by the callback for looping).
324 let resolveWrite, rejectWrite;
325 let writeInput, writeFlush;
326 let writeInOff, writeAvailIn, writeAvailOutBefore;
327
328 // processCallback: called by C++ AfterThreadPoolWork when compression
329 // on the threadpool completes. Collects output, loops if the engine
330 // has more output to produce (availOut === 0), then resolves the
331 // promise when all output for this input chunk is collected.
332 function onWriteComplete() {
333 const availOut = writeState[0];
334 const availInAfter = writeState[1];
335 const have = writeAvailOutBefore - availOut;
336 const bufferExhausted = availOut === 0 || outOffset + have >= chunkSize;
337
338 if (have > 0) {
339 if (bufferExhausted && outOffset === 0) {
340 // Entire buffer filled from start - yield directly, no copy.
341 ArrayPrototypePush(pending, outBuf);
342 } else if (bufferExhausted) {
343 // Tail of buffer filled and buffer is being replaced -
344 // subarray is safe since outBuf reference is overwritten below.
345 ArrayPrototypePush(pending,
346 outBuf.subarray(outOffset, outOffset + have));
347 } else {
348 // Partial fill, buffer will be reused - must copy.
349 ArrayPrototypePush(pending,
350 TypedArrayPrototypeSlice(outBuf,
351 outOffset,
352 outOffset + have));
353 }
354 pendingBytes += have;
355 outOffset += have;
356 }
357
358 // Reallocate output buffer if exhausted.
359 if (bufferExhausted) {
360 outBuf = Buffer.allocUnsafe(chunkSize);
361 outOffset = 0;
362 }
363

Callers 8

compressGzipFunction · 0.85
compressDeflateFunction · 0.85
compressBrotliFunction · 0.85
compressZstdFunction · 0.85
decompressGzipFunction · 0.85
decompressDeflateFunction · 0.85
decompressBrotliFunction · 0.85
decompressZstdFunction · 0.85

Calls 8

processInputAsyncFunction · 0.85
drainBatchFunction · 0.85
throwIfAbortedMethod · 0.80
addEventListenerMethod · 0.65
nextMethod · 0.65
removeEventListenerMethod · 0.65
closeMethod · 0.65
returnMethod · 0.45

Tested by

no test coverage detected

Used in the wild real call sites across dependent graphs

searching dependent graphs…