MCPcopy Index your code
hub / github.com/modelcontextprotocol/python-sdk / jsonrpc_pair

Function jsonrpc_pair

tests/shared/conftest.py:33–48  ·  view source on GitHub ↗

Two `JSONRPCDispatcher`s wired over crossed in-memory streams.

(*, can_send_request: bool = True)

Source from the content-addressed store, hash-verified

31
32
33def jsonrpc_pair(*, can_send_request: bool = True) -> DispatcherTriple:
34 """Two `JSONRPCDispatcher`s wired over crossed in-memory streams."""
35 c2s_send, c2s_recv = anyio.create_memory_object_stream[SessionMessage | Exception](32)
36 s2c_send, s2c_recv = anyio.create_memory_object_stream[SessionMessage | Exception](32)
37
38 def builder(_meta: object) -> TransportContext:
39 return TransportContext(kind="jsonrpc", can_send_request=can_send_request)
40
41 client: JSONRPCDispatcher[TransportContext] = JSONRPCDispatcher(s2c_recv, c2s_send, transport_builder=builder)
42 server: JSONRPCDispatcher[TransportContext] = JSONRPCDispatcher(c2s_recv, s2c_send, transport_builder=builder)
43
44 def close() -> None:
45 for s in (c2s_send, c2s_recv, s2c_send, s2c_recv):
46 s.close()
47
48 return client, server, close
49
50
51@pytest.fixture(

Callers 3

factoryFunction · 0.85
connected_runnerFunction · 0.85

Calls 1

JSONRPCDispatcherClass · 0.90

Tested by

no test coverage detected