Connect to the server and yield streams for communication.
(self)
| 39 | |
| 40 | @asynccontextmanager |
| 41 | async def _connect(self) -> AsyncIterator[TransportStreams]: |
| 42 | """Connect to the server and yield streams for communication.""" |
| 43 | # Unwrap MCPServer to get underlying Server |
| 44 | if isinstance(self._server, MCPServer): |
| 45 | # TODO(Marcelo): Make `lowlevel_server` public. |
| 46 | actual_server: Server[Any] = self._server._lowlevel_server # type: ignore[reportPrivateUsage] |
| 47 | else: |
| 48 | actual_server = self._server |
| 49 | |
| 50 | async with create_client_server_memory_streams() as (client_streams, server_streams): |
| 51 | client_read, client_write = client_streams |
| 52 | server_read, server_write = server_streams |
| 53 | |
| 54 | server_done = anyio.Event() |
| 55 | |
| 56 | async def _run_server() -> None: |
| 57 | try: |
| 58 | await actual_server.run( |
| 59 | server_read, |
| 60 | server_write, |
| 61 | actual_server.create_initialization_options(), |
| 62 | raise_exceptions=self._raise_exceptions, |
| 63 | ) |
| 64 | finally: |
| 65 | server_done.set() |
| 66 | |
| 67 | async with anyio.create_task_group() as tg: |
| 68 | tg.start_soon(_run_server) |
| 69 | |
| 70 | try: |
| 71 | yield client_read, client_write |
| 72 | finally: |
| 73 | # EOF the server (and our own read side) instead of |
| 74 | # cancelling outright. The dispatcher's run() cancels its |
| 75 | # own in-flight handlers on read-stream EOF, so for a |
| 76 | # well-behaved server the task exits naturally and the |
| 77 | # task-group join below is immediate. Cancelling here |
| 78 | # unconditionally would `coro.throw()` into this task, |
| 79 | # which on CPython 3.11 (gh-106749) drops `'call'` trace |
| 80 | # events for the outer await chain and desyncs coverage's |
| 81 | # CTracer past the test frame. |
| 82 | await client_write.aclose() |
| 83 | await server_write.aclose() |
| 84 | # Backstop: the dispatcher exits on EOF, but the server's |
| 85 | # own teardown (lifespan __aexit__, connection.exit_stack |
| 86 | # callbacks) runs after that and is user code. If it never |
| 87 | # completes the join would hang forever, so bound the wait |
| 88 | # and fall back to cancelling. The healthy path returns |
| 89 | # from wait() without the timeout firing, so the cancel is |
| 90 | # never reached and gh-106749 stays avoided. If the cancel |
| 91 | # does fire, the checkpoint at the end of |
| 92 | # `create_client_server_memory_streams` resyncs the tracer. |
| 93 | with anyio.move_on_after(SERVER_SHUTDOWN_GRACE): |
| 94 | await server_done.wait() |
| 95 | if not server_done.is_set(): |
| 96 | tg.cancel_scope.cancel() |
| 97 | |
| 98 | async def __aenter__(self) -> TransportStreams: |
no test coverage detected