(recorder: Recorder)
| 40 | |
| 41 | |
| 42 | def echo_handlers(recorder: Recorder) -> tuple[OnRequest, OnNotify]: |
| 43 | async def on_request( |
| 44 | ctx: DispatchContext[TransportContext], method: str, params: Mapping[str, Any] | None |
| 45 | ) -> dict[str, Any]: |
| 46 | # Strip `_meta` so JSON-RPC and direct dispatch record identically: |
| 47 | # the JSON-RPC outbound path always attaches `_meta` (otel injection). |
| 48 | recorded = {k: v for k, v in (params or {}).items() if k != "_meta"} if params is not None else None |
| 49 | recorder.requests.append((method, recorded)) |
| 50 | recorder.contexts.append(ctx) |
| 51 | return {"echoed": method, "params": recorded or {}} |
| 52 | |
| 53 | async def on_notify(ctx: DispatchContext[TransportContext], method: str, params: Mapping[str, Any] | None) -> None: |
| 54 | recorder.notifications.append((method, params)) |
| 55 | recorder.notified.set() |
| 56 | |
| 57 | return on_request, on_notify |
| 58 | |
| 59 | |
| 60 | @asynccontextmanager |
no outgoing calls
no test coverage detected