(reducer, initialValue, options)
| 234 | } |
| 235 | |
| 236 | async function reduce(reducer, initialValue, options) { |
| 237 | validateFunction(reducer, 'reducer'); |
| 238 | if (options != null) { |
| 239 | validateObject(options, 'options'); |
| 240 | } |
| 241 | if (options?.signal != null) { |
| 242 | validateAbortSignal(options.signal, 'options.signal'); |
| 243 | } |
| 244 | |
| 245 | let hasInitialValue = arguments.length > 1; |
| 246 | if (options?.signal?.aborted) { |
| 247 | const err = new AbortError(undefined, { cause: options.signal.reason }); |
| 248 | this.once('error', () => {}); // The error is already propagated |
| 249 | await finished(this.destroy(err)); |
| 250 | throw err; |
| 251 | } |
| 252 | const ac = new AbortController(); |
| 253 | const signal = ac.signal; |
| 254 | if (options?.signal) { |
| 255 | const opts = { once: true, [kWeakHandler]: this, [kResistStopPropagation]: true }; |
| 256 | options.signal.addEventListener('abort', () => ac.abort(), opts); |
| 257 | } |
| 258 | let gotAnyItemFromStream = false; |
| 259 | try { |
| 260 | for await (const value of this) { |
| 261 | gotAnyItemFromStream = true; |
| 262 | if (options?.signal?.aborted) { |
| 263 | throw new AbortError(); |
| 264 | } |
| 265 | if (!hasInitialValue) { |
| 266 | initialValue = value; |
| 267 | hasInitialValue = true; |
| 268 | } else { |
| 269 | initialValue = await reducer(initialValue, value, { signal }); |
| 270 | } |
| 271 | } |
| 272 | if (!gotAnyItemFromStream && !hasInitialValue) { |
| 273 | throw new ReduceAwareErrMissingArgs(); |
| 274 | } |
| 275 | } finally { |
| 276 | ac.abort(); |
| 277 | } |
| 278 | return initialValue; |
| 279 | } |
| 280 | |
| 281 | async function toArray(options) { |
| 282 | if (options != null) { |
searching dependent graphs…