MCPcopy
hub / github.com/modelcontextprotocol/python-sdk / handle_call_tool

Function handle_call_tool

tests/shared/test_streamable_http.py:1977–1992  ·  view source on GitHub ↗
(ctx: ServerRequestContext, params: CallToolRequestParams)

Source from the content-addressed store, hash-verified

1975 milestone.set()
1976
1977 async def handle_call_tool(ctx: ServerRequestContext, params: CallToolRequestParams) -> CallToolResult:
1978 assert params.name == "multi_close_tool"
1979 for i, milestone in enumerate(milestones.values()):
1980 await ctx.session.send_log_message( # pyright: ignore[reportDeprecated]
1981 level="info",
1982 data=f"checkpoint_{i}",
1983 logger="multi_close_tool",
1984 related_request_id=ctx.request_id,
1985 )
1986 assert ctx.close_sse_stream is not None
1987 await ctx.close_sse_stream()
1988 # Client and server share one event loop, so the tool can wait directly on the
1989 # client-side callback observing the reconnect.
1990 with anyio.fail_after(5):
1991 await milestone.wait()
1992 return CallToolResult(content=[TextContent(type="text", text="Completed 3 checkpoints")])
1993
1994 server = Server("multi_reconnect_server", on_call_tool=handle_call_tool)
1995

Callers

nothing calls this directly

Calls 5

CallToolResultClass · 0.90
TextContentClass · 0.90
send_log_messageMethod · 0.80
waitMethod · 0.80
close_sse_streamMethod · 0.45

Tested by

no test coverage detected