(tag: str)
| 95 | echoes: dict[str, str] = {} |
| 96 | |
| 97 | async def sample(tag: str) -> None: |
| 98 | result = await ctx.session.create_message( # pyright: ignore[reportDeprecated] |
| 99 | messages=[SamplingMessage(role="user", content=TextContent(text=tag))], |
| 100 | max_tokens=10, |
| 101 | ) |
| 102 | assert isinstance(result.content, TextContent) |
| 103 | echoes[tag] = result.content.text |
| 104 | |
| 105 | async with anyio.create_task_group() as sampler_group: |
| 106 | sampler_group.start_soon(sample, "x") |
nothing calls this directly
no test coverage detected