MCPcopy Index your code
hub / github.com/modelcontextprotocol/python-sdk / sample

Function sample

tests/client/test_session_concurrency.py:97–103  ·  view source on GitHub ↗
(tag: str)

Source from the content-addressed store, hash-verified

95 echoes: dict[str, str] = {}
96
97 async def sample(tag: str) -> None:
98 result = await ctx.session.create_message( # pyright: ignore[reportDeprecated]
99 messages=[SamplingMessage(role="user", content=TextContent(text=tag))],
100 max_tokens=10,
101 )
102 assert isinstance(result.content, TextContent)
103 echoes[tag] = result.content.text
104
105 async with anyio.create_task_group() as sampler_group:
106 sampler_group.start_soon(sample, "x")

Callers

nothing calls this directly

Calls 3

SamplingMessageClass · 0.90
TextContentClass · 0.90
create_messageMethod · 0.80

Tested by

no test coverage detected