| 207 | |
| 208 | /** @internal */ |
| 209 | export function createWorkQueue< |
| 210 | T, |
| 211 | I, |
| 212 | G extends Group<G>, |
| 213 | S extends Stream<T, I, G, S>, |
| 214 | >(initialWork: Work<T, I, G, S> | undefined): WorkQueue<T, I, G, S> { |
| 215 | const rootGroups = new Set<G>(); |
| 216 | const rootStreams = new Set<S>(); |
| 217 | const groupNodes = new Map<G, GroupNode<T, I, G, S>>(); |
| 218 | const taskNodes = new Map<Task<T, I, G, S>, TaskNode<T, I, G, S>>(); |
| 219 | let pushGraphEvent!: (e: GraphEvent<T, I, G, S>) => PromiseOrValue<void>; |
| 220 | let stopGraphEvents!: (err?: unknown) => PromiseOrValue<void>; |
| 221 | |
| 222 | const { newGroups: initialRootGroups, newStreams: initialRootStreams } = |
| 223 | maybeIntegrateWork(initialWork); |
| 224 | const nonEmptyInitialRootGroups = pruneEmptyGroups(initialRootGroups); |
| 225 | // Initialize root groups and streams at startup to prepare for cancellation |
| 226 | // prior to starting the work queue |
| 227 | for (const group of nonEmptyInitialRootGroups) { |
| 228 | rootGroups.add(group); |
| 229 | } |
| 230 | for (const stream of initialRootStreams) { |
| 231 | rootStreams.add(stream); |
| 232 | } |
| 233 | |
| 234 | const events = new Queue<GraphEvent<T, I, G, S>>( |
| 235 | ({ push: _push, stop: _stop, onStop, started }) => { |
| 236 | pushGraphEvent = _push; |
| 237 | stopGraphEvents = _stop; |
| 238 | // eslint-disable-next-line @typescript-eslint/no-floating-promises |
| 239 | started.then(() => { |
| 240 | for (const group of rootGroups) { |
| 241 | startGroup(group); |
| 242 | } |
| 243 | for (const stream of rootStreams) { |
| 244 | // eslint-disable-next-line @typescript-eslint/no-floating-promises |
| 245 | startStream(stream); |
| 246 | } |
| 247 | }); |
| 248 | onStop((reason) => cancel(reason)); |
| 249 | }, |
| 250 | 1, |
| 251 | ).subscribe((graphEvents) => handleGraphEvents(graphEvents)); |
| 252 | |
| 253 | return { |
| 254 | initialGroups: nonEmptyInitialRootGroups, |
| 255 | initialStreams: initialRootStreams, |
| 256 | events, |
| 257 | }; |
| 258 | |
| 259 | function cancel(reason?: unknown): PromiseOrValue<void> { |
| 260 | const cancelPromises: Array<Promise<unknown>> = []; |
| 261 | for (const group of rootGroups) { |
| 262 | cancelGroup(group, reason, cancelPromises); |
| 263 | } |
| 264 | for (const stream of rootStreams) { |
| 265 | cancelStream(stream, reason, cancelPromises); |
| 266 | } |