MCPcopy
hub / github.com/graphql/graphql-js / createWorkQueue

Function createWorkQueue

src/execution/incremental/WorkQueue.ts:209–701  ·  view source on GitHub ↗
(initialWork: Work<T, I, G, S> | undefined)

Source from the content-addressed store, hash-verified

207
208/** @internal */
209export function createWorkQueue<
210 T,
211 I,
212 G extends Group<G>,
213 S extends Stream<T, I, G, S>,
214>(initialWork: Work<T, I, G, S> | undefined): WorkQueue<T, I, G, S> {
215 const rootGroups = new Set<G>();
216 const rootStreams = new Set<S>();
217 const groupNodes = new Map<G, GroupNode<T, I, G, S>>();
218 const taskNodes = new Map<Task<T, I, G, S>, TaskNode<T, I, G, S>>();
219 let pushGraphEvent!: (e: GraphEvent<T, I, G, S>) => PromiseOrValue<void>;
220 let stopGraphEvents!: (err?: unknown) => PromiseOrValue<void>;
221
222 const { newGroups: initialRootGroups, newStreams: initialRootStreams } =
223 maybeIntegrateWork(initialWork);
224 const nonEmptyInitialRootGroups = pruneEmptyGroups(initialRootGroups);
225 // Initialize root groups and streams at startup to prepare for cancellation
226 // prior to starting the work queue
227 for (const group of nonEmptyInitialRootGroups) {
228 rootGroups.add(group);
229 }
230 for (const stream of initialRootStreams) {
231 rootStreams.add(stream);
232 }
233
234 const events = new Queue<GraphEvent<T, I, G, S>>(
235 ({ push: _push, stop: _stop, onStop, started }) => {
236 pushGraphEvent = _push;
237 stopGraphEvents = _stop;
238 // eslint-disable-next-line @typescript-eslint/no-floating-promises
239 started.then(() => {
240 for (const group of rootGroups) {
241 startGroup(group);
242 }
243 for (const stream of rootStreams) {
244 // eslint-disable-next-line @typescript-eslint/no-floating-promises
245 startStream(stream);
246 }
247 });
248 onStop((reason) => cancel(reason));
249 },
250 1,
251 ).subscribe((graphEvents) => handleGraphEvents(graphEvents));
252
253 return {
254 initialGroups: nonEmptyInitialRootGroups,
255 initialStreams: initialRootStreams,
256 events,
257 };
258
259 function cancel(reason?: unknown): PromiseOrValue<void> {
260 const cancelPromises: Array<Promise<unknown>> = [];
261 for (const group of rootGroups) {
262 cancelGroup(group, reason, cancelPromises);
263 }
264 for (const stream of rootStreams) {
265 cancelStream(stream, reason, cancelPromises);
266 }

Callers 4

buildResponseMethod · 0.90
collectWorkRunFunction · 0.90
WorkQueue-test.tsFile · 0.90
buildResponseMethod · 0.90

Calls 8

maybeIntegrateWorkFunction · 0.85
pruneEmptyGroupsFunction · 0.85
startGroupFunction · 0.85
startStreamFunction · 0.85
cancelFunction · 0.85
handleGraphEventsFunction · 0.85
subscribeMethod · 0.80
addMethod · 0.45

Tested by 1

collectWorkRunFunction · 0.72