(
ctx: DispatchContext[TransportContext], method: str, params: Mapping[str, Any] | None
)
| 153 | """A handler's ctx.send_raw_request reaches the side that made the inbound request.""" |
| 154 | |
| 155 | async def server_on_request( |
| 156 | ctx: DispatchContext[TransportContext], method: str, params: Mapping[str, Any] | None |
| 157 | ) -> dict[str, Any]: |
| 158 | sample = await ctx.send_raw_request("sampling/createMessage", {"prompt": "hi"}) |
| 159 | return {"sampled": sample} |
| 160 | |
| 161 | async with running_pair(pair_factory, server_on_request=server_on_request) as (client, _server, crec, _srec): |
| 162 | with anyio.fail_after(5): |
nothing calls this directly
no test coverage detected