MCPcopy
hub / github.com/CopilotKit/CopilotKit / createFleetQueueClient

Function createFleetQueueClient

showcase/harness/src/fleet/queue-client.ts:781–2330  ·  view source on GitHub ↗
(
  config: FleetQueueClientConfig,
)

Source from the content-addressed store, hash-verified

779}
780
781export function createFleetQueueClient(
782 config: FleetQueueClientConfig,
783): FleetQueueClientWithSweepOutcome {
784 const { pb, claim, logger } = config;
785 const rng = config.rng ?? Math.random;
786 const now = config.now ?? Date.now;
787 const sleep =
788 config.sleep ??
789 ((ms: number) => new Promise<void>((resolve) => setTimeout(resolve, ms)));
790 const stalePolicy = config.stalePending ?? {};
791 const staleExpiryPeriods =
792 stalePolicy.expiryPeriods ?? DEFAULT_STALE_PENDING_EXPIRY_PERIODS;
793 const staleDefaultPeriodMs =
794 stalePolicy.defaultPeriodMs ?? DEFAULT_STALE_PENDING_FAMILY_PERIOD_MS;
795 const stalePeriodMsFor = (family: string): number =>
796 stalePolicy.familyPeriodsMs?.[family] ?? staleDefaultPeriodMs;
797
798 // Per-client cache of a claimed job's decoded payload, keyed by jobId. The
799 // renew CAS returns the lifecycle columns but NOT the payload, and the
800 // convenience re-read used to re-hydrate it can momentarily fail (a PB read
801 // blip). Throwing on that blip permanently stops the worker's heartbeat,
802 // after which the sweeper reclaims the still-live job and synthesizes a FALSE
803 // `worker-crashed-mid-job` comm error. So we remember the payload at claim
804 // time and reuse it on renew, making the re-read a non-fatal convenience.
805 const payloadCache = new Map<string, ServiceJobPayload>();
806
807 // Last-known GOOD lease per held job, set at claim time and refreshed on
808 // every successful renew. This is what an INDETERMINATE renew (a thrown
809 // claim.renewLease — 5xx or 2xx-unreadable; the renew may or may not have
810 // committed) hands back to the heartbeat: the worker heartbeat treats a
811 // renewLease THROW like a lost lease (it breaks), so letting the throw
812 // escape kills the heartbeat → the sweeper reclaims a LIVE job → a FALSE
813 // worker-crashed-mid-job. Returning the current lease unchanged keeps the
814 // heartbeat alive so the NEXT beat retries; only a definitive
815 // `renewed: false` stops it. Evicted exactly where payloadCache is.
816 const leaseCache = new Map<string, JobLease>();
817
818 // CONCURRENT-SWEEP LATCH (see sweepExpired): the in-flight sweep's
819 // promise, or null when none is running. A sweep arriving while one is in
820 // flight piggybacks on this promise instead of racing the in-flight
821 // sweep's per-call grace set.
822 let sweepInFlight: Promise<SweepResultWithIndeterminate> | null = null;
823
824 /**
825 * Bounded-retry persistence of a per-service result onto an ALREADY
826 * TERMINAL probe_jobs row — the SEPARATE record write that follows a
827 * release CAS (migration 1779989700 adds `result` + `result_processed`).
828 * Never throws: returns `{ ok: true }` on success, or `{ ok: false,
829 * lastErr }` after RESULT_WRITE_MAX_ATTEMPTS attempts so the caller
830 * decides what the loss means (`report` — distinct "result lost" error).
831 * Used by `report()` ONLY: the decode-failure attribution in `claimNext`
832 * is a deliberate single-attempt write (its retry pacing would stall the
833 * claim race, and the consumer's crash synthesis is its backstop).
834 * `result_processed` seeds false: the consumer latches it true after
835 * aggregating exactly once.
836 */
837 async function writeResult(
838 jobId: string,

Callers 5

makeWorldFunction · 0.85
firstAttemptHistogramFunction · 0.85
runViaControlPlaneFunction · 0.85

Calls

no outgoing calls

Tested by 2

makeWorldFunction · 0.68
firstAttemptHistogramFunction · 0.68

Used in the wild real call sites across dependent graphs

searching dependent graphs…