MCPcopy Index your code
hub / github.com/TaskingAI/TaskingAI / api_execute

Function api_execute

plugin/app/routes/execute.py:107–140  ·  view source on GitHub ↗
(
    data: RunToolRequest,
)

Source from the content-addressed store, hash-verified

105 response_model=Union[RunToolResponse, BaseDataResponse],
106)
107async def api_execute(
108 data: RunToolRequest,
109):
110 plugin: PluginHandler = get_plugin_handler(
111 bundle_id=data.bundle_id,
112 plugin_id=data.plugin_id,
113 )
114 if not plugin:
115 raise_http_error(ErrorCode.OBJECT_NOT_FOUND, f"Plugin {data.plugin_id} not found")
116
117 cleaned_dict = {k: v for k, v in data.input_params.items() if v is not None}
118 data.input_params = cleaned_dict
119
120 try:
121 plugin_output = await plugin.execute(
122 credentials=data.credentials,
123 plugin_input=PluginInput(input_params=data.input_params, project_id=data.project_id),
124 )
125 except TKHttpException as e:
126 logger.error(f"api_execute: {data.bundle_id}/{data.plugin_id}. Input params: {data.input_params}. Error: {e}")
127 return BaseDataResponse(
128 data={
129 "status": e.status_code,
130 "data": {
131 "error": e.detail.get("message", "Error when handling response from provider"),
132 },
133 }
134 )
135 except Exception as e:
136 raise_http_error(ErrorCode.INTERNAL_SERVER_ERROR, str(e))
137
138 return RunToolResponse(
139 data=plugin_output,
140 )

Callers

nothing calls this directly

Calls 7

get_plugin_handlerFunction · 0.90
raise_http_errorFunction · 0.90
PluginInputClass · 0.90
RunToolResponseClass · 0.85
BaseDataResponseClass · 0.70
executeMethod · 0.45
getMethod · 0.45

Tested by

no test coverage detected