MCPcopy
hub / github.com/modelcontextprotocol/python-sdk / _connect

Method _connect

src/mcp/client/_memory.py:41–96  ·  view source on GitHub ↗

Connect to the server and yield streams for communication.

(self)

Source from the content-addressed store, hash-verified

39
40 @asynccontextmanager
41 async def _connect(self) -> AsyncIterator[TransportStreams]:
42 """Connect to the server and yield streams for communication."""
43 # Unwrap MCPServer to get underlying Server
44 if isinstance(self._server, MCPServer):
45 # TODO(Marcelo): Make `lowlevel_server` public.
46 actual_server: Server[Any] = self._server._lowlevel_server # type: ignore[reportPrivateUsage]
47 else:
48 actual_server = self._server
49
50 async with create_client_server_memory_streams() as (client_streams, server_streams):
51 client_read, client_write = client_streams
52 server_read, server_write = server_streams
53
54 server_done = anyio.Event()
55
56 async def _run_server() -> None:
57 try:
58 await actual_server.run(
59 server_read,
60 server_write,
61 actual_server.create_initialization_options(),
62 raise_exceptions=self._raise_exceptions,
63 )
64 finally:
65 server_done.set()
66
67 async with anyio.create_task_group() as tg:
68 tg.start_soon(_run_server)
69
70 try:
71 yield client_read, client_write
72 finally:
73 # EOF the server (and our own read side) instead of
74 # cancelling outright. The dispatcher's run() cancels its
75 # own in-flight handlers on read-stream EOF, so for a
76 # well-behaved server the task exits naturally and the
77 # task-group join below is immediate. Cancelling here
78 # unconditionally would `coro.throw()` into this task,
79 # which on CPython 3.11 (gh-106749) drops `'call'` trace
80 # events for the outer await chain and desyncs coverage's
81 # CTracer past the test frame.
82 await client_write.aclose()
83 await server_write.aclose()
84 # Backstop: the dispatcher exits on EOF, but the server's
85 # own teardown (lifespan __aexit__, connection.exit_stack
86 # callbacks) runs after that and is user code. If it never
87 # completes the join would hang forever, so bound the wait
88 # and fall back to cancelling. The healthy path returns
89 # from wait() without the timeout firing, so the cancel is
90 # never reached and gh-106749 stays avoided. If the cancel
91 # does fire, the checkpoint at the end of
92 # `create_client_server_memory_streams` resyncs the tracer.
93 with anyio.move_on_after(SERVER_SHUTDOWN_GRACE):
94 await server_done.wait()
95 if not server_done.is_set():
96 tg.cancel_scope.cancel()
97
98 async def __aenter__(self) -> TransportStreams:

Callers 2

__aenter__Method · 0.95
_build_sessionMethod · 0.80

Calls 3

waitMethod · 0.80
acloseMethod · 0.45

Tested by

no test coverage detected