MCPcopy Index your code
hub / github.com/nodejs/node / map

Function map

lib/internal/streams/operators.js:37–182  ·  view source on GitHub ↗
(fn, options)

Source from the content-addressed store, hash-verified

35const kEof = Symbol('kEof');
36
37function map(fn, options) {
38 validateFunction(fn, 'fn');
39 if (options != null) {
40 validateObject(options, 'options');
41 }
42 if (options?.signal != null) {
43 validateAbortSignal(options.signal, 'options.signal');
44 }
45
46 let concurrency = 1;
47 if (options?.concurrency != null) {
48 concurrency = MathFloor(options.concurrency);
49 }
50
51 let highWaterMark = concurrency - 1;
52 if (options?.highWaterMark != null) {
53 highWaterMark = MathFloor(options.highWaterMark);
54 }
55
56 validateInteger(concurrency, 'options.concurrency', 1);
57 validateInteger(highWaterMark, 'options.highWaterMark', 0);
58
59 highWaterMark += concurrency;
60
61 return async function* map() {
62 const signal = AbortSignal.any([options?.signal].filter(Boolean));
63 const stream = this;
64 const queue = [];
65 const signalOpt = { signal };
66
67 let next;
68 let resume;
69 let done = false;
70 let cnt = 0;
71
72 function onCatch() {
73 done = true;
74 afterItemProcessed();
75 }
76
77 function afterItemProcessed() {
78 cnt -= 1;
79 maybeResume();
80 }
81
82 function maybeResume() {
83 if (
84 resume &&
85 !done &&
86 cnt < concurrency &&
87 queue.length < highWaterMark
88 ) {
89 resume();
90 resume = null;
91 }
92 }
93
94 async function pump() {

Callers

nothing calls this directly

Calls 8

validateAbortSignalFunction · 0.85
maybeResumeFunction · 0.85
pumpFunction · 0.70
resumeFunction · 0.70
anyMethod · 0.65
filterMethod · 0.65
callMethod · 0.45
shiftMethod · 0.45

Tested by

no test coverage detected