MCPcopy
hub / github.com/IBM/AssetOpsBench / execute_plan

Method execute_plan

src/agent/plan_execute/executor.py:78–125  ·  view source on GitHub ↗

Execute all plan steps in dependency order.

(self, plan: Plan, question: str)

Source from the content-addressed store, hash-verified

76 return descriptions
77
78 async def execute_plan(self, plan: Plan, question: str) -> list[StepResult]:
79 """Execute all plan steps in dependency order."""
80 ordered = plan.resolved_order()
81 total = len(ordered)
82
83 # Pre-fetch tool schemas for all servers referenced in the plan so that
84 # _resolve_args_with_llm can include exact parameter names in its prompt.
85 server_names = {step.server for step in ordered}
86 tool_schemas: dict[str, dict[str, str]] = {} # server -> {tool_name -> sig}
87 for name in server_names:
88 path = self._server_paths.get(name)
89 if path is None:
90 continue
91 try:
92 tools = await _list_tools(path)
93 tool_schemas[name] = {
94 t["name"]: ", ".join(
95 f"{p['name']}: {p['type']}{'?' if not p['required'] else ''}"
96 for p in t.get("parameters", [])
97 )
98 for t in tools
99 }
100 except Exception: # noqa: BLE001
101 tool_schemas[name] = {}
102
103 context: dict[int, StepResult] = {}
104 results: list[StepResult] = []
105 for step in ordered:
106 _log.info(
107 "Step %d/%d [%s]: %s",
108 step.step_number,
109 total,
110 step.server,
111 step.task,
112 )
113 schema = tool_schemas.get(step.server, {}).get(step.tool, "")
114 step_started = time.perf_counter()
115 result = await self.execute_step(
116 step, context, question, tool_schema=schema
117 )
118 result.duration_ms = (time.perf_counter() - step_started) * 1000
119 if result.success:
120 _log.info("Step %d OK.", step.step_number)
121 else:
122 _log.warning("Step %d FAILED: %s", step.step_number, result.error)
123 context[step.step_number] = result
124 results.append(result)
125 return results
126
127 async def execute_step(
128 self,

Calls 4

execute_stepMethod · 0.95
_list_toolsFunction · 0.85
resolved_orderMethod · 0.80
getMethod · 0.45