MCPcopy Index your code
hub / github.com/modelcontextprotocol/python-sdk / JSONRPCDispatcher

Class JSONRPCDispatcher

src/mcp/shared/jsonrpc_dispatcher.py:229–770  ·  view source on GitHub ↗

`Dispatcher` over the `SessionMessage` stream contract. Explicit Protocol base so pyright checks conformance at the class definition.

Source from the content-addressed store, hash-verified

227
228
229class JSONRPCDispatcher(Dispatcher[TransportT]):
230 """`Dispatcher` over the `SessionMessage` stream contract.
231
232 Explicit Protocol base so pyright checks conformance at the class definition.
233 """
234
235 def __init__(
236 self,
237 read_stream: ReadStream[SessionMessage | Exception],
238 write_stream: WriteStream[SessionMessage],
239 *,
240 transport_builder: Callable[[MessageMetadata], TransportT] | None = None,
241 peer_cancel_mode: PeerCancelMode = "interrupt",
242 raise_handler_exceptions: bool = False,
243 inline_methods: frozenset[str] = frozenset(),
244 on_stream_exception: Callable[[Exception], Awaitable[None]] | None = None,
245 ) -> None:
246 """Wire a dispatcher over a transport's `SessionMessage` stream pair.
247
248 Args:
249 transport_builder: Builds each message's `TransportContext` from
250 its `SessionMessage.metadata`.
251 raise_handler_exceptions: Re-raise handler exceptions out of
252 `run()` after the error response is written.
253 inline_methods: Methods awaited in the read loop before the next
254 message is dequeued (e.g. `initialize`); an inline handler
255 that awaits the peer deadlocks the parked loop.
256 on_stream_exception: Observer for `Exception` items on the read
257 stream; without it they are debug-logged and dropped. Awaited
258 inline in the read loop, so a slow observer stalls dispatch.
259 """
260 self._read_stream = read_stream
261 self._write_stream = write_stream
262 # With transport_builder omitted, TransportT defaults to
263 # TransportContext; pyright can't connect the two, hence the cast.
264 self._transport_builder = cast(
265 "Callable[[MessageMetadata], TransportT]",
266 transport_builder or _default_transport_builder,
267 )
268 self._peer_cancel_mode: PeerCancelMode = peer_cancel_mode
269 self._raise_handler_exceptions = raise_handler_exceptions
270 self._inline_methods = inline_methods
271 self.on_stream_exception = on_stream_exception
272 """Observer for ``Exception`` items on the read stream. Mutable so a session can
273 bind it after the dispatcher is built (e.g. ``ClientSession`` routing into
274 ``message_handler``); only consulted inside ``run()`` so pre-enter assignment is safe."""
275
276 self._next_id = 0
277 self._pending: dict[RequestId, _Pending] = {}
278 self._in_flight: dict[RequestId, _InFlight[TransportT]] = {}
279 self._tg: anyio.abc.TaskGroup | None = None
280 self._running = False
281 self._closed = False
282
283 async def send_raw_request(
284 self,
285 method: str,
286 params: Mapping[str, Any] | None,

Calls

no outgoing calls