MCPcopy
hub / github.com/promptfoo/promptfoo / ProviderGroupedCallQueue

Class ProviderGroupedCallQueue

src/scheduler/providerCallQueue.ts:14–81  ·  view source on GitHub ↗

Source from the content-addressed store, hash-verified

12}
13
14export class ProviderGroupedCallQueue implements ProviderCallQueue {
15 private jobs: QueuedProviderCall<unknown>[] = [];
16 private waiters: (() => void)[] = [];
17
18 enqueue<T>(providerId: string, call: () => Promise<T>): Promise<T> {
19 const boundCall = AsyncResource.bind(call);
20 return new Promise<T>((resolve, reject) => {
21 this.jobs.push({
22 call: boundCall as () => Promise<unknown>,
23 providerId,
24 reject,
25 resolve: resolve as (result: unknown) => void,
26 });
27 this.notifyWaiters();
28 });
29 }
30
31 hasJobs(): boolean {
32 return this.jobs.length > 0;
33 }
34
35 takeNextGroup(preferredProviderId?: string): QueuedProviderCall<unknown>[] {
36 if (this.jobs.length === 0) {
37 return [];
38 }
39
40 const providerId =
41 preferredProviderId && this.jobs.some((job) => job.providerId === preferredProviderId)
42 ? preferredProviderId
43 : this.jobs[0].providerId;
44 const group: QueuedProviderCall<unknown>[] = [];
45 const remaining: QueuedProviderCall<unknown>[] = [];
46
47 for (const job of this.jobs) {
48 if (job.providerId === providerId) {
49 group.push(job);
50 } else {
51 remaining.push(job);
52 }
53 }
54
55 this.jobs = remaining;
56 return group;
57 }
58
59 waitForJob(): Promise<void> {
60 if (this.hasJobs()) {
61 return Promise.resolve();
62 }
63 return new Promise((resolve) => {
64 this.waiters.push(resolve);
65 });
66 }
67
68 async run(job: QueuedProviderCall<unknown>): Promise<void> {
69 try {
70 job.resolve(await job.call());
71 } catch (error) {

Callers

nothing calls this directly

Calls

no outgoing calls

Tested by

no test coverage detected

Used in the wild real call sites across dependent graphs

searching dependent graphs…