MCPcopy
hub / github.com/nowork-studio/NotFair / dispatchJob

Function dispatchJob

notfair-cmo/src/server/scheduler/tick.ts:65–128  ·  view source on GitHub ↗
(job: ScheduledJob)

Source from the content-addressed store, hash-verified

63}
64
65async function dispatchJob(job: ScheduledJob): Promise<string | null> {
66 const project = getProject(job.project_slug);
67 if (!project) {
68 throw new Error(`project not found: ${job.project_slug}`);
69 }
70 const adapter = requireAdapter(project.harness_adapter);
71 const session = getOrCreateSession({
72 project_slug: project.slug,
73 agent_id: job.agent_id,
74 label: `cron:${job.name}`,
75 harness_adapter: project.harness_adapter,
76 });
77 appendTranscriptEvent(session.id, "user", { text: job.message, source: "cron" });
78
79 // Prefer the adapter's explicit `final` event for the summary; fall back to
80 // concatenated `delta` chunks if the adapter only streams text. Truncate so
81 // we never bloat the DB with a multi-MB run row.
82 let finalText: string | null = null;
83 let deltaBuffer = "";
84 const MAX_SUMMARY = 4000;
85 const errors: { message: string; transient: boolean }[] = [];
86
87 for await (const evt of adapter.execute({
88 projectSlug: project.slug,
89 agentId: job.agent_id,
90 workspaceDir: workspaceDirFor(job.agent_id),
91 message: job.message,
92 threadId: session.id,
93 harnessSessionId: session.harness_session_id,
94 })) {
95 if (evt.kind === "session") {
96 touchSession(session.id, evt.harnessSessionId);
97 continue;
98 }
99 appendTranscriptEvent(session.id, evt.kind, evt);
100 if (evt.kind === "final") {
101 finalText = evt.text;
102 } else if (evt.kind === "delta" && deltaBuffer.length < MAX_SUMMARY) {
103 deltaBuffer += evt.text;
104 } else if (evt.kind === "error") {
105 // Buffer rather than throw. Adapters (notably codex-local) can emit
106 // several error events during a single retry burst — the last one is
107 // usually the richest (exit code + stderr tail from execute.ts's
108 // close handler). Throwing on the first event surfaced opaque retry
109 // chatter like "Reconnecting... 2/5 (...)" and discarded the
110 // actionable post-exit message that arrived a few hundred ms later.
111 errors.push({ message: evt.message, transient: evt.transient ?? false });
112 }
113 }
114 touchSession(session.id);
115
116 // No `final` and we have errors → the turn failed. Prefer the most recent
117 // non-transient error (the terminal exit-code message). Fall back to the
118 // last entry when everything was tagged transient (we ran out of richer
119 // signal; the retry chatter is all we have).
120 if (finalText === null && errors.length > 0) {
121 const terminal = [...errors].reverse().find((e) => !e.transient);
122 throw new Error((terminal ?? errors[errors.length - 1]!).message);

Callers 1

runTickSafeFunction · 0.85

Calls 7

getProjectFunction · 0.90
requireAdapterFunction · 0.90
getOrCreateSessionFunction · 0.90
appendTranscriptEventFunction · 0.90
touchSessionFunction · 0.90
executeMethod · 0.80
workspaceDirForFunction · 0.70

Tested by

no test coverage detected