MCPcopy Index your code
hub / github.com/TaskingAI/TaskingAI / fetch_tools

Function fetch_tools

backend/app/services/tool/tool.py:40–82  ·  view source on GitHub ↗

Fetch tools :param tool_refs: the tool references :return: the tools

(tool_refs: List[ToolRef])

Source from the content-addressed store, hash-verified

38
39
40async def fetch_tools(tool_refs: List[ToolRef]) -> List[Tool]:
41 """
42 Fetch tools
43 :param tool_refs: the tool references
44 :return: the tools
45 """
46
47 tools: List[Tool] = []
48 tool_ids = []
49 name_number_dict = {}
50 # todo: handle duplicate tool name
51
52 for tool_ref in tool_refs:
53 if tool_ref.type == ToolType.ACTION:
54 action = await action_ops.get(action_id=tool_ref.id)
55 if action:
56 tool_ids.append(tool_ref.id)
57 tools.append(
58 Tool(
59 tool_id=tool_ref.id,
60 type=ToolType.ACTION,
61 function_def=action.function_def.model_dump(),
62 )
63 )
64
65 elif tool_ref.type == ToolType.PLUGIN:
66 if "/" not in tool_ref.id:
67 raise_request_validation_error(f"Invalid plugin tool ID: {tool_ref.id}")
68 bundle_instance_id, plugin_id = tool_ref.id.split("/")
69 bundle: BundleInstance = await bundle_instance_ops.get(bundle_instance_id=bundle_instance_id)
70 plugin = get_plugin(bundle_id=bundle.bundle_id, plugin_id=plugin_id)
71 tools.append(
72 Tool(
73 tool_id=tool_ref.id,
74 type=ToolType.PLUGIN,
75 function_def=plugin.function_def.model_dump(),
76 )
77 )
78
79 else:
80 raise_request_validation_error(f"Invalid tool type: {tool_ref.type}")
81
82 return tools
83
84
85async def run_tools(tool_inputs: List[ToolInput]) -> List[ToolOutput]:

Callers 1

prepareMethod · 0.90

Calls 5

ToolClass · 0.90
get_pluginFunction · 0.50
getMethod · 0.45
model_dumpMethod · 0.45

Tested by

no test coverage detected