MCPcopy
hub / github.com/IBM/AssetOpsBench / run

Method run

src/mcphub/__init__.py:202–222  ·  view source on GitHub ↗

Execute a tool or workflow. ToolUniverse form: run({"name": "iot.sensors", "arguments": {...}}) Shorthand: run("iot.sensors", {...})

(self, query, arguments: Optional[dict] = None)

Source from the content-addressed store, hash-verified

200
201 # -- step 3: run ------------------------------------------------------- #
202 def run(self, query, arguments: Optional[dict] = None) -> Any:
203 """Execute a tool or workflow.
204
205 ToolUniverse form: run({"name": "iot.sensors", "arguments": {...}})
206 Shorthand: run("iot.sensors", {...})
207 """
208 if isinstance(query, dict):
209 name = query["name"]
210 args = query.get("arguments", {}) or {}
211 else:
212 name = query
213 args = arguments or {}
214
215 if name in self._workflows:
216 return self._workflows[name](self, **args)
217
218 server, tool = self._resolve(name)
219 result = self._worker.call(server, tool, args)
220 if getattr(result, "isError", False):
221 raise RuntimeError(f"Tool '{name}' failed: {_unwrap(result)}")
222 return _unwrap(result)
223
224 # -- discovery --------------------------------------------------------- #
225 def find_tools(self, description: str, limit: int = 5) -> List[dict]:

Callers 3

mainFunction · 0.95
chiller_triageFunction · 0.45
sensor_inventory_gapFunction · 0.45

Calls 4

_resolveMethod · 0.95
_unwrapFunction · 0.85
callMethod · 0.80
getMethod · 0.45

Tested by

no test coverage detected