Use super agent to reply to a query Args: query: User query context: COW context (optional, contains session_id for user isolation) on_event: Event callback (optional) clear_history: Whether to clear conversation history
(self, query: str, context: Context = None,
on_event=None, clear_history: bool = False)
| 435 | return count |
| 436 | |
| 437 | def agent_reply(self, query: str, context: Context = None, |
| 438 | on_event=None, clear_history: bool = False) -> Reply: |
| 439 | """ |
| 440 | Use super agent to reply to a query |
| 441 | |
| 442 | Args: |
| 443 | query: User query |
| 444 | context: COW context (optional, contains session_id for user isolation) |
| 445 | on_event: Event callback (optional) |
| 446 | clear_history: Whether to clear conversation history |
| 447 | |
| 448 | Returns: |
| 449 | Reply object |
| 450 | """ |
| 451 | session_id = None |
| 452 | agent = None |
| 453 | request_id = None |
| 454 | cancel_event = None |
| 455 | try: |
| 456 | # Extract session_id from context for user isolation |
| 457 | if context: |
| 458 | session_id = context.kwargs.get("session_id") or context.get("session_id") |
| 459 | request_id = context.kwargs.get("request_id") or context.get("request_id") |
| 460 | |
| 461 | # Register a cancel token. Prefer per-turn request_id (web), |
| 462 | # fall back to session_id (IM channels). The Event is polled by |
| 463 | # AgentStreamExecutor at safe checkpoints. |
| 464 | registry = get_cancel_registry() |
| 465 | token_key = request_id or session_id |
| 466 | if token_key: |
| 467 | cancel_event = registry.register(token_key, session_id=session_id) |
| 468 | |
| 469 | # Get agent for this session (will auto-initialize if needed) |
| 470 | agent = self.get_agent(session_id=session_id) |
| 471 | if not agent: |
| 472 | return Reply(ReplyType.ERROR, "Failed to initialize super agent") |
| 473 | |
| 474 | # Create event handler for logging and channel communication |
| 475 | event_handler = AgentEventHandler(context=context, original_callback=on_event) |
| 476 | |
| 477 | # Filter tools based on context |
| 478 | original_tools = agent.tools |
| 479 | filtered_tools = original_tools |
| 480 | |
| 481 | # If this is a scheduled task execution, exclude scheduler tool to prevent recursion |
| 482 | if context and context.get("is_scheduled_task"): |
| 483 | filtered_tools = [tool for tool in agent.tools if tool.name != "scheduler"] |
| 484 | agent.tools = filtered_tools |
| 485 | logger.info(f"[AgentBridge] Scheduled task execution: excluded scheduler tool ({len(filtered_tools)}/{len(original_tools)} tools)") |
| 486 | else: |
| 487 | # Attach context to scheduler tool if present |
| 488 | if context and agent.tools: |
| 489 | for tool in agent.tools: |
| 490 | if tool.name == "scheduler": |
| 491 | try: |
| 492 | from agent.tools.scheduler.integration import attach_scheduler_to_tool |
| 493 | attach_scheduler_to_tool(tool, context) |
| 494 | except Exception as e: |
no test coverage detected