Execute all plan steps in dependency order.
(self, plan: Plan, question: str)
| 76 | return descriptions |
| 77 | |
| 78 | async def execute_plan(self, plan: Plan, question: str) -> list[StepResult]: |
| 79 | """Execute all plan steps in dependency order.""" |
| 80 | ordered = plan.resolved_order() |
| 81 | total = len(ordered) |
| 82 | |
| 83 | # Pre-fetch tool schemas for all servers referenced in the plan so that |
| 84 | # _resolve_args_with_llm can include exact parameter names in its prompt. |
| 85 | server_names = {step.server for step in ordered} |
| 86 | tool_schemas: dict[str, dict[str, str]] = {} # server -> {tool_name -> sig} |
| 87 | for name in server_names: |
| 88 | path = self._server_paths.get(name) |
| 89 | if path is None: |
| 90 | continue |
| 91 | try: |
| 92 | tools = await _list_tools(path) |
| 93 | tool_schemas[name] = { |
| 94 | t["name"]: ", ".join( |
| 95 | f"{p['name']}: {p['type']}{'?' if not p['required'] else ''}" |
| 96 | for p in t.get("parameters", []) |
| 97 | ) |
| 98 | for t in tools |
| 99 | } |
| 100 | except Exception: # noqa: BLE001 |
| 101 | tool_schemas[name] = {} |
| 102 | |
| 103 | context: dict[int, StepResult] = {} |
| 104 | results: list[StepResult] = [] |
| 105 | for step in ordered: |
| 106 | _log.info( |
| 107 | "Step %d/%d [%s]: %s", |
| 108 | step.step_number, |
| 109 | total, |
| 110 | step.server, |
| 111 | step.task, |
| 112 | ) |
| 113 | schema = tool_schemas.get(step.server, {}).get(step.tool, "") |
| 114 | step_started = time.perf_counter() |
| 115 | result = await self.execute_step( |
| 116 | step, context, question, tool_schema=schema |
| 117 | ) |
| 118 | result.duration_ms = (time.perf_counter() - step_started) * 1000 |
| 119 | if result.success: |
| 120 | _log.info("Step %d OK.", step.step_number) |
| 121 | else: |
| 122 | _log.warning("Step %d FAILED: %s", step.step_number, result.error) |
| 123 | context[step.step_number] = result |
| 124 | results.append(result) |
| 125 | return results |
| 126 | |
| 127 | async def execute_step( |
| 128 | self, |