(
input: RunAgentInput,
config: BuiltInAgentFactoryConfig,
)
| 1794 | } |
| 1795 | |
| 1796 | private runFactory( |
| 1797 | input: RunAgentInput, |
| 1798 | config: BuiltInAgentFactoryConfig, |
| 1799 | ): Observable<BaseEvent> { |
| 1800 | if (this.abortController) { |
| 1801 | throw new Error( |
| 1802 | "Agent is already running. Call abortRun() first or create a new instance.", |
| 1803 | ); |
| 1804 | } |
| 1805 | |
| 1806 | // Set synchronously before Observable creation to close TOCTOU window |
| 1807 | this.abortController = new AbortController(); |
| 1808 | const controller = this.abortController; |
| 1809 | |
| 1810 | return new Observable<BaseEvent>((subscriber) => { |
| 1811 | const startEvent: RunStartedEvent = { |
| 1812 | type: EventType.RUN_STARTED, |
| 1813 | threadId: input.threadId, |
| 1814 | runId: input.runId, |
| 1815 | }; |
| 1816 | subscriber.next(startEvent); |
| 1817 | |
| 1818 | const ctx: AgentFactoryContext = { |
| 1819 | input, |
| 1820 | abortController: controller, |
| 1821 | abortSignal: controller.signal, |
| 1822 | interrupt: async (interrupts: Interrupt[]) => { |
| 1823 | const resume = input.resume ?? []; |
| 1824 | const ids = new Set(interrupts.map((i) => i.id)); |
| 1825 | const matching = resume.filter((r) => ids.has(r.interruptId)); |
| 1826 | // All requested interrupts addressed → resume: return responses. |
| 1827 | if (interrupts.length > 0 && matching.length === interrupts.length) { |
| 1828 | return matching; |
| 1829 | } |
| 1830 | // Fresh run (or not yet addressed) → pause. |
| 1831 | throw new InterruptSignal(interrupts); |
| 1832 | }, |
| 1833 | }; |
| 1834 | |
| 1835 | // Resume injection (aisdk/tanstack): map each ResumeEntry to a tool-role |
| 1836 | // message keyed by interruptId (=== the paused tool call's id) and append |
| 1837 | // it to the messages the factory sees. Both SDK converters |
| 1838 | // (convertMessagesToVercelAISDKMessages / convertInputToTanStackAI) turn a |
| 1839 | // tool-role message into that SDK's native tool-result, so the model |
| 1840 | // continues — no SDK-specific approval-response wiring needed. The `custom` |
| 1841 | // factory reads input.resume itself via ctx.interrupt(), so leave it alone. |
| 1842 | // Idempotent: skip entries the client already recorded as a tool-result |
| 1843 | // message in the thread (useInterrupt persists resolutions so the |
| 1844 | // conversation stays well-formed across turns). Only synthesize results |
| 1845 | // for entries that aren't already answered, so we never double-answer a |
| 1846 | // tool call. |
| 1847 | const answeredToolCallIds = new Set( |
| 1848 | input.messages |
| 1849 | .filter((m) => m.role === "tool") |
| 1850 | .map((m) => (m as { toolCallId?: string }).toolCallId) |
| 1851 | .filter((id): id is string => typeof id === "string"), |
| 1852 | ); |
| 1853 | const resumeToolMessages: Message[] = (input.resume ?? []) |
no test coverage detected