MCPcopy Index your code
hub / github.com/volcengine/MineContext / run_async

Method run_async

opencontext/tools/tools_executor.py:35–76  ·  view source on GitHub ↗
(self, tool_name: str, tool_input: Dict[str, Any])

Source from the content-addressed store, hash-verified

33 }
34
35 async def run_async(self, tool_name: str, tool_input: Dict[str, Any]) -> Any:
36 if tool_name in self._tools_map:
37 tool = self._tools_map[tool_name]
38
39 if (
40 isinstance(tool_input, list)
41 and len(tool_input) == 1
42 and isinstance(tool_input[0], dict)
43 ):
44 tool_input = tool_input[0]
45
46 # Ensure tool_input is dictionary type
47 if not isinstance(tool_input, dict):
48 return {
49 "error": f"Tool parameter format error: expected dict, got {type(tool_input).__name__}",
50 "message": "Tool parameters must be in dictionary format",
51 "received_type": type(tool_input).__name__,
52 }
53
54 return tool.execute(**tool_input)
55 else:
56 # Log unknown tool call but don't throw exception, return warning message
57 import logging
58 from difflib import get_close_matches
59
60 logger = logging.getLogger(__name__)
61
62 # Provide similar tool name suggestions
63 available_tools = list(self._tools_map.keys())
64 suggestions = get_close_matches(tool_name, available_tools, n=3, cutoff=0.6)
65 suggestion_text = f"Suggested tools: {', '.join(suggestions)}" if suggestions else ""
66
67 error_msg = f"Unknown tool: {tool_name}. {suggestion_text}"
68 available_tools_text = f"Available tools: {', '.join(available_tools[:10])}" + (
69 "..." if len(available_tools) > 10 else ""
70 )
71 return {
72 "error": error_msg,
73 "message": "This tool does not exist, please use system-provided tools",
74 "available_tools": available_tools_text,
75 "suggestions": suggestions,
76 }
77
78 def run(self, tool_name: str, tool_input: Dict[str, Any]) -> Any:
79 if tool_name in self._tools_map:

Callers 4

batch_run_tools_asyncMethod · 0.95
execute_tool_asyncMethod · 0.80

Calls 1

executeMethod · 0.45

Tested by

no test coverage detected