MCPcopy Index your code
hub / github.com/nodejs/node / createBatchedAsyncIterator

Function createBatchedAsyncIterator

lib/internal/streams/iter/classic.js:138–198  ·  view source on GitHub ↗
(stream, normalize)

Source from the content-addressed store, hash-verified

136const nop = () => {};
137
138async function* createBatchedAsyncIterator(stream, normalize) {
139 let callback = nop;
140
141 function next(resolve) {
142 if (this === stream) {
143 callback();
144 callback = nop;
145 } else {
146 callback = resolve;
147 }
148 }
149
150 stream.on('readable', next);
151
152 let error;
153 const cleanup = eos(stream, { writable: false }, (err) => {
154 error = err ? aggregateTwoErrors(error, err) : null;
155 callback();
156 callback = nop;
157 });
158
159 try {
160 while (true) {
161 const chunk = stream.destroyed ? null : stream.read();
162 if (chunk !== null) {
163 const batch = [chunk];
164 while (batch.length < MAX_DRAIN_BATCH &&
165 stream._readableState?.length > 0) {
166 const c = stream.read();
167 if (c === null) break;
168 ArrayPrototypePush(batch, c);
169 }
170 if (normalize !== null) {
171 const result = await normalize(batch);
172 if (result !== null) {
173 yield result;
174 }
175 } else {
176 yield batch;
177 }
178 } else if (error) {
179 throw error;
180 } else if (error === null) {
181 return;
182 } else {
183 await new Promise(next);
184 }
185 }
186 } catch (err) {
187 error = aggregateTwoErrors(error, err);
188 throw error;
189 } finally {
190 if (error === undefined ||
191 (stream._readableState?.autoDestroy)) {
192 destroyImpl.destroyer(stream, null);
193 } else {
194 stream.off('readable', next);
195 cleanup();

Callers 2

readable.jsFile · 0.85
fromReadableFunction · 0.85

Calls 8

eosFunction · 0.85
aggregateTwoErrorsFunction · 0.85
cleanupFunction · 0.70
callbackFunction · 0.50
normalizeFunction · 0.50
onMethod · 0.45
readMethod · 0.45
offMethod · 0.45

Tested by

no test coverage detected

Used in the wild real call sites across dependent graphs

searching dependent graphs…