MCPcopy
hub / github.com/modelcontextprotocol/python-sdk / connected_runner

Function connected_runner

tests/server/test_runner.py:93–148  ·  view source on GitHub ↗

Yield `(client, runner)` running over an in-memory JSON-RPC dispatcher pair. Starts the client (echo handlers) and the server-side dispatcher loop (kernel `on_request`/`on_notify` + `aclose_shielded` teardown - the `serve_connection` shape) in a task group, wraps the body in `anyio.

(
    server: SrvT,
    *,
    initialized: bool = True,
    init_options: InitializationOptions | None = None,
    dispatch_middleware: list[DispatchMiddleware] | None = None,
    connection: Connection | None = None,
)

Source from the content-addressed store, hash-verified

91
92@asynccontextmanager
93async def connected_runner(
94 server: SrvT,
95 *,
96 initialized: bool = True,
97 init_options: InitializationOptions | None = None,
98 dispatch_middleware: list[DispatchMiddleware] | None = None,
99 connection: Connection | None = None,
100) -> AsyncIterator[tuple[JSONRPCDispatcher[TransportContext], ServerRunner[dict[str, Any]]]]:
101 """Yield `(client, runner)` running over an in-memory JSON-RPC dispatcher pair.
102
103 Starts the client (echo handlers) and the server-side dispatcher loop
104 (kernel `on_request`/`on_notify` + `aclose_shielded` teardown - the
105 `serve_connection` shape) in a task group, wraps the body in
106 `anyio.fail_after(5)`, and cancels on exit. When `initialized` is true the
107 helper performs the real `initialize` request before yielding, so tests
108 start past the init-gate via the public path.
109
110 `connection` defaults to `Connection.for_loop(server_dispatcher)`. Pass a
111 factory-built connection (e.g. `Connection.from_envelope(...)`) to exercise
112 the born-ready path; the kernel reads it as a fact and is mode-agnostic.
113 """
114 client, server_d, close = jsonrpc_pair()
115 assert isinstance(client, JSONRPCDispatcher) and isinstance(server_d, JSONRPCDispatcher)
116 if connection is None:
117 connection = Connection.for_loop(server_d)
118 runner = ServerRunner(
119 server=server,
120 connection=connection,
121 lifespan_state={},
122 init_options=init_options,
123 dispatch_middleware=dispatch_middleware or [],
124 )
125 c_req, c_notify = echo_handlers(Recorder())
126 body_exc: BaseException | None = None
127
128 async def _drive(*, task_status: anyio.abc.TaskStatus[None]) -> None:
129 try:
130 await server_d.run(runner.on_request, runner.on_notify, task_status=task_status)
131 finally:
132 await aclose_shielded(connection)
133
134 async with anyio.create_task_group() as tg:
135 await tg.start(client.run, c_req, c_notify)
136 await tg.start(_drive)
137 try:
138 with anyio.fail_after(5):
139 if initialized:
140 await client.send_raw_request("initialize", _initialize_params())
141 yield client, runner
142 except BaseException as e:
143 # Capture and re-raise outside the task group so test failures
144 # surface as the original exception, not an ExceptionGroup wrapper.
145 body_exc = e
146 close()
147 if body_exc is not None:
148 raise body_exc
149
150

Calls 9

ServerRunnerClass · 0.90
jsonrpc_pairFunction · 0.85
echo_handlersFunction · 0.85
RecorderClass · 0.85
_initialize_paramsFunction · 0.85
closeFunction · 0.85
for_loopMethod · 0.80
startMethod · 0.45
send_raw_requestMethod · 0.45

Tested by

no test coverage detected