(request: AgentRunnerRunRequest)
| 224 | } |
| 225 | |
| 226 | run(request: AgentRunnerRunRequest): Observable<BaseEvent> { |
| 227 | // Check if thread is already running in database |
| 228 | const runState = this.getRunState(request.threadId); |
| 229 | if (runState.isRunning) { |
| 230 | throw new Error("Thread already running"); |
| 231 | } |
| 232 | |
| 233 | // Mark thread as running in database |
| 234 | this.setRunState(request.threadId, true, request.input.runId); |
| 235 | |
| 236 | // Track seen message IDs and current run events in memory for this run |
| 237 | const seenMessageIds = new Set<string>(); |
| 238 | const currentRunEvents: BaseEvent[] = []; |
| 239 | |
| 240 | // Get all previously seen message IDs from historic runs |
| 241 | const historicRuns = this.getHistoricRuns(request.threadId); |
| 242 | const historicMessageIds = new Set<string>(); |
| 243 | for (const run of historicRuns) { |
| 244 | for (const event of run.events) { |
| 245 | if ("messageId" in event && typeof event.messageId === "string") { |
| 246 | historicMessageIds.add(event.messageId); |
| 247 | } |
| 248 | if (event.type === EventType.RUN_STARTED) { |
| 249 | const runStarted = event as RunStartedEvent; |
| 250 | const messages = runStarted.input?.messages ?? []; |
| 251 | for (const message of messages) { |
| 252 | historicMessageIds.add(message.id); |
| 253 | } |
| 254 | } |
| 255 | } |
| 256 | } |
| 257 | |
| 258 | // Get or create subject for this thread's connections |
| 259 | const nextSubject = new ReplaySubject<BaseEvent>(Infinity); |
| 260 | const prevConnection = ACTIVE_CONNECTIONS.get(request.threadId); |
| 261 | const prevSubject = prevConnection?.subject; |
| 262 | |
| 263 | // Create a subject for run() return value |
| 264 | const runSubject = new ReplaySubject<BaseEvent>(Infinity); |
| 265 | |
| 266 | // Update the active connection for this thread |
| 267 | ACTIVE_CONNECTIONS.set(request.threadId, { |
| 268 | subject: nextSubject, |
| 269 | agent: request.agent, |
| 270 | runSubject, |
| 271 | currentEvents: currentRunEvents, |
| 272 | stopRequested: false, |
| 273 | }); |
| 274 | |
| 275 | // Helper function to run the agent and handle errors |
| 276 | const runAgent = async () => { |
| 277 | // Get parent run ID for chaining |
| 278 | const parentRunId = this.getLatestRunId(request.threadId); |
| 279 | |
| 280 | try { |
| 281 | await request.agent.runAgent(request.input, { |
| 282 | onEvent: ({ event }) => { |
| 283 | let processedEvent: BaseEvent = event; |
nothing calls this directly
no test coverage detected