(prompt, opts = {})
| 53 | } |
| 54 | |
| 55 | const agent: WorkflowHooks['agent'] = async (prompt, opts = {}) => { |
| 56 | const r = ctx.resources |
| 57 | if (r.agentCountBox.value >= MAX_TOTAL_AGENTS) { |
| 58 | throw new WorkflowError( |
| 59 | `workflow exceeds total agent cap (${MAX_TOTAL_AGENTS})`, |
| 60 | ) |
| 61 | } |
| 62 | |
| 63 | // Assign a unique id to each agent() call (including journal hits); stamp started/done so the reducer can associate them precisely |
| 64 | const agentId = r.agentIdSeq.value++ |
| 65 | |
| 66 | const params: AgentRunParams = { prompt, ...opts } |
| 67 | const key = agentCallKey(prompt, params) |
| 68 | const label = opts.label as string | undefined |
| 69 | const phase = |
| 70 | (opts.phase as string | undefined) ?? ctx.currentPhase ?? undefined |
| 71 | |
| 72 | // Journal hit -> return cached result directly |
| 73 | if (!ctx.journalInvalidated && ctx.journalIndex < ctx.journal.length) { |
| 74 | const entry = ctx.journal[ctx.journalIndex]! |
| 75 | if (entry.key === key) { |
| 76 | ctx.journalIndex++ |
| 77 | emit({ |
| 78 | type: 'agent_done', |
| 79 | agentId, |
| 80 | label, |
| 81 | phase, |
| 82 | result: entry.result, |
| 83 | }) |
| 84 | return resultToOutput(entry.result) |
| 85 | } |
| 86 | // Divergence: discard subsequent journal entries; everything from here on runs live |
| 87 | ctx.journalInvalidated = true |
| 88 | ctx.journal = ctx.journal.slice(0, ctx.journalIndex) |
| 89 | await ctx.ports.journalStore.truncate(ctx.runId) |
| 90 | } |
| 91 | |
| 92 | let release: () => void |
| 93 | try { |
| 94 | release = await ctx.resources.semaphore.acquire(ctx.signal) |
| 95 | } catch { |
| 96 | // Queued wait during abort: the semaphore already removed the waiter and did not consume a permit |
| 97 | throw new WorkflowAbortedError() |
| 98 | } |
| 99 | try { |
| 100 | if (ctx.signal.aborted) throw new WorkflowAbortedError() |
| 101 | // Budget check inside the semaphore critical section: a queued waiter sees the latest spent when woken, |
| 102 | // otherwise N waiters enqueued while spent=0 all pass the check and overspend on wake-up without re-check. |
| 103 | // Journal-hit path does not charge budget and needs no check. |
| 104 | r.budget.assertCanSpend() |
| 105 | |
| 106 | const pending = ctx.ports.taskRegistrar.pendingAction(ctx.runId) |
| 107 | if (pending?.kind === 'skip') { |
| 108 | const result: AgentRunResult = { kind: 'skipped' } |
| 109 | emit({ type: 'agent_done', agentId, label, phase, result }) |
| 110 | return null |
| 111 | } |
| 112 |
no test coverage detected