| 136 | const nop = () => {}; |
| 137 | |
| 138 | async function* createBatchedAsyncIterator(stream, normalize) { |
| 139 | let callback = nop; |
| 140 | |
| 141 | function next(resolve) { |
| 142 | if (this === stream) { |
| 143 | callback(); |
| 144 | callback = nop; |
| 145 | } else { |
| 146 | callback = resolve; |
| 147 | } |
| 148 | } |
| 149 | |
| 150 | stream.on('readable', next); |
| 151 | |
| 152 | let error; |
| 153 | const cleanup = eos(stream, { writable: false }, (err) => { |
| 154 | error = err ? aggregateTwoErrors(error, err) : null; |
| 155 | callback(); |
| 156 | callback = nop; |
| 157 | }); |
| 158 | |
| 159 | try { |
| 160 | while (true) { |
| 161 | const chunk = stream.destroyed ? null : stream.read(); |
| 162 | if (chunk !== null) { |
| 163 | const batch = [chunk]; |
| 164 | while (batch.length < MAX_DRAIN_BATCH && |
| 165 | stream._readableState?.length > 0) { |
| 166 | const c = stream.read(); |
| 167 | if (c === null) break; |
| 168 | ArrayPrototypePush(batch, c); |
| 169 | } |
| 170 | if (normalize !== null) { |
| 171 | const result = await normalize(batch); |
| 172 | if (result !== null) { |
| 173 | yield result; |
| 174 | } |
| 175 | } else { |
| 176 | yield batch; |
| 177 | } |
| 178 | } else if (error) { |
| 179 | throw error; |
| 180 | } else if (error === null) { |
| 181 | return; |
| 182 | } else { |
| 183 | await new Promise(next); |
| 184 | } |
| 185 | } |
| 186 | } catch (err) { |
| 187 | error = aggregateTwoErrors(error, err); |
| 188 | throw error; |
| 189 | } finally { |
| 190 | if (error === undefined || |
| 191 | (stream._readableState?.autoDestroy)) { |
| 192 | destroyImpl.destroyer(stream, null); |
| 193 | } else { |
| 194 | stream.off('readable', next); |
| 195 | cleanup(); |