Connect to an MCP server via stdio and call a tool.
(server_path: Path | str, tool_name: str, args: dict)
| 315 | |
| 316 | |
| 317 | async def _call_tool(server_path: Path | str, tool_name: str, args: dict) -> str: |
| 318 | """Connect to an MCP server via stdio and call a tool.""" |
| 319 | from mcp import ClientSession |
| 320 | from mcp.client.stdio import stdio_client |
| 321 | |
| 322 | params = _make_stdio_params(server_path) |
| 323 | async with stdio_client(params) as (read, write): |
| 324 | async with ClientSession(read, write) as session: |
| 325 | await session.initialize() |
| 326 | result = await session.call_tool(tool_name, args) |
| 327 | return _extract_content(result.content) |
| 328 | |
| 329 | |
| 330 | def _extract_content(content: list[Any]) -> str: |
no test coverage detected