MCPcopy
hub / github.com/modelcontextprotocol/python-sdk / running_pair

Function running_pair

tests/shared/test_dispatcher.py:61–85  ·  view source on GitHub ↗

Yield `(client, server, client_recorder, server_recorder)` with both `run()` loops live.

(
    factory: PairFactory,
    *,
    server_on_request: OnRequest | None = None,
    server_on_notify: OnNotify | None = None,
    client_on_request: OnRequest | None = None,
    client_on_notify: OnNotify | None = None,
    can_send_request: bool = True,
)

Source from the content-addressed store, hash-verified

59
60@asynccontextmanager
61async def running_pair(
62 factory: PairFactory,
63 *,
64 server_on_request: OnRequest | None = None,
65 server_on_notify: OnNotify | None = None,
66 client_on_request: OnRequest | None = None,
67 client_on_notify: OnNotify | None = None,
68 can_send_request: bool = True,
69) -> AsyncIterator[tuple[Dispatcher[TransportContext], Dispatcher[TransportContext], Recorder, Recorder]]:
70 """Yield `(client, server, client_recorder, server_recorder)` with both `run()` loops live."""
71 client, server, close = factory(can_send_request=can_send_request)
72 client_rec, server_rec = Recorder(), Recorder()
73 c_req, c_notify = echo_handlers(client_rec)
74 s_req, s_notify = echo_handlers(server_rec)
75 try:
76 async with anyio.create_task_group() as tg:
77 await tg.start(client.run, client_on_request or c_req, client_on_notify or c_notify)
78 await tg.start(server.run, server_on_request or s_req, server_on_notify or s_notify)
79 try:
80 yield client, server, client_rec, server_rec
81 finally:
82 tg.cancel_scope.cancel()
83 finally:
84 await resync_tracer()
85 close()
86
87
88@pytest.mark.anyio

Calls 6

resync_tracerFunction · 0.90
RecorderClass · 0.85
echo_handlersFunction · 0.85
closeFunction · 0.85
factoryFunction · 0.70
startMethod · 0.45

Tested by

no test coverage detected