Execute a callback tool by name.
(
cls,
tool_name: str,
arguments: dict[str, Any],
task: dict | None = None,
)
| 36 | |
| 37 | @classmethod |
| 38 | def call_tool( |
| 39 | cls, |
| 40 | tool_name: str, |
| 41 | arguments: dict[str, Any], |
| 42 | task: dict | None = None, |
| 43 | ) -> CallToolResult | CreateTaskResult: |
| 44 | """Execute a callback tool by name.""" |
| 45 | callback_map = get_app().mcp_callback_map |
| 46 | cb = callback_map.find_by_tool_name(tool_name) |
| 47 | if cb is None: |
| 48 | raise ToolNotFoundError( |
| 49 | f"Tool not found: {tool_name}." |
| 50 | " The app's callbacks may have changed." |
| 51 | " Please call tools/list to refresh your tool list." |
| 52 | ) |
| 53 | |
| 54 | # pylint: disable-next=protected-access |
| 55 | is_background = bool(cb._cb_info.get("background")) |
| 56 | |
| 57 | try: |
| 58 | callback_response = run_callback(cb, arguments) |
| 59 | except CallbackExecutionError as e: |
| 60 | return CallToolResult( |
| 61 | content=[TextContent(type="text", text=str(e))], |
| 62 | isError=True, |
| 63 | ) |
| 64 | |
| 65 | if is_background: |
| 66 | task_result = create_task(dict(callback_response), cb) |
| 67 | if task is not None: |
| 68 | return task_result |
| 69 | return task_result_to_tool_result(task_result) |
| 70 | |
| 71 | return format_callback_response(callback_response, cb) |
nothing calls this directly
no test coverage detected