* Returns an `AsyncIterator` that iterates `event` events. * @param {EventEmitter} emitter * @param {string | symbol} event * @param {{ * signal: AbortSignal; * close?: string[]; * highWaterMark?: number, * lowWaterMark?: number * }} [options] * @returns {AsyncIterator}
(emitter, event, options = kEmptyObject)
| 1072 | * @returns {AsyncIterator} |
| 1073 | */ |
| 1074 | function on(emitter, event, options = kEmptyObject) { |
| 1075 | // Parameters validation |
| 1076 | validateObject(options, 'options'); |
| 1077 | const signal = options.signal; |
| 1078 | validateAbortSignal(signal, 'options.signal'); |
| 1079 | if (signal?.aborted) |
| 1080 | throw new AbortError(undefined, { cause: signal.reason }); |
| 1081 | // Support both highWaterMark and highWatermark for backward compatibility |
| 1082 | const highWatermark = options.highWaterMark ?? options.highWatermark ?? NumberMAX_SAFE_INTEGER; |
| 1083 | validateInteger(highWatermark, 'options.highWaterMark', 1); |
| 1084 | // Support both lowWaterMark and lowWatermark for backward compatibility |
| 1085 | const lowWatermark = options.lowWaterMark ?? options.lowWatermark ?? 1; |
| 1086 | validateInteger(lowWatermark, 'options.lowWaterMark', 1); |
| 1087 | |
| 1088 | // Preparing controlling queues and variables |
| 1089 | FixedQueue ??= require('internal/fixed_queue'); |
| 1090 | const unconsumedEvents = new FixedQueue(); |
| 1091 | const unconsumedPromises = new FixedQueue(); |
| 1092 | let paused = false; |
| 1093 | let error = null; |
| 1094 | let finished = false; |
| 1095 | let size = 0; |
| 1096 | |
| 1097 | const iterator = ObjectSetPrototypeOf({ |
| 1098 | next() { |
| 1099 | // First, we consume all unread events |
| 1100 | if (size) { |
| 1101 | const value = unconsumedEvents.shift(); |
| 1102 | size--; |
| 1103 | if (paused && size < lowWatermark) { |
| 1104 | emitter.resume(); // Can not be finished yet |
| 1105 | paused = false; |
| 1106 | } |
| 1107 | return PromiseResolve(createIterResult(value, false)); |
| 1108 | } |
| 1109 | |
| 1110 | // Then we error, if an error happened |
| 1111 | // This happens one time if at all, because after 'error' |
| 1112 | // we stop listening |
| 1113 | if (error) { |
| 1114 | const p = PromiseReject(error); |
| 1115 | // Only the first element errors |
| 1116 | error = null; |
| 1117 | return p; |
| 1118 | } |
| 1119 | |
| 1120 | // If the iterator is finished, resolve to done |
| 1121 | if (finished) return closeHandler(); |
| 1122 | |
| 1123 | // Wait until an event happens |
| 1124 | return new Promise(function(resolve, reject) { |
| 1125 | unconsumedPromises.push({ resolve, reject }); |
| 1126 | }); |
| 1127 | }, |
| 1128 | |
| 1129 | return() { |
| 1130 | return closeHandler(); |
| 1131 | }, |
nothing calls this directly
no test coverage detected