In-memory transport for testing MCP servers without network overhead. This transport starts the server in a background task and provides streams for client-side communication. The server is automatically stopped when the context manager exits.
| 19 | |
| 20 | |
| 21 | class InMemoryTransport: |
| 22 | """In-memory transport for testing MCP servers without network overhead. |
| 23 | |
| 24 | This transport starts the server in a background task and provides |
| 25 | streams for client-side communication. The server is automatically |
| 26 | stopped when the context manager exits. |
| 27 | """ |
| 28 | |
| 29 | def __init__(self, server: Server[Any] | MCPServer, *, raise_exceptions: bool = False) -> None: |
| 30 | """Initialize the in-memory transport. |
| 31 | |
| 32 | Args: |
| 33 | server: The MCP server to connect to (Server or MCPServer instance) |
| 34 | raise_exceptions: Whether to raise exceptions from the server |
| 35 | """ |
| 36 | self._server = server |
| 37 | self._raise_exceptions = raise_exceptions |
| 38 | self._cm: AbstractAsyncContextManager[TransportStreams] | None = None |
| 39 | |
| 40 | @asynccontextmanager |
| 41 | async def _connect(self) -> AsyncIterator[TransportStreams]: |
| 42 | """Connect to the server and yield streams for communication.""" |
| 43 | # Unwrap MCPServer to get underlying Server |
| 44 | if isinstance(self._server, MCPServer): |
| 45 | # TODO(Marcelo): Make `lowlevel_server` public. |
| 46 | actual_server: Server[Any] = self._server._lowlevel_server # type: ignore[reportPrivateUsage] |
| 47 | else: |
| 48 | actual_server = self._server |
| 49 | |
| 50 | async with create_client_server_memory_streams() as (client_streams, server_streams): |
| 51 | client_read, client_write = client_streams |
| 52 | server_read, server_write = server_streams |
| 53 | |
| 54 | server_done = anyio.Event() |
| 55 | |
| 56 | async def _run_server() -> None: |
| 57 | try: |
| 58 | await actual_server.run( |
| 59 | server_read, |
| 60 | server_write, |
| 61 | actual_server.create_initialization_options(), |
| 62 | raise_exceptions=self._raise_exceptions, |
| 63 | ) |
| 64 | finally: |
| 65 | server_done.set() |
| 66 | |
| 67 | async with anyio.create_task_group() as tg: |
| 68 | tg.start_soon(_run_server) |
| 69 | |
| 70 | try: |
| 71 | yield client_read, client_write |
| 72 | finally: |
| 73 | # EOF the server (and our own read side) instead of |
| 74 | # cancelling outright. The dispatcher's run() cancels its |
| 75 | # own in-flight handlers on read-stream EOF, so for a |
| 76 | # well-behaved server the task exits naturally and the |
| 77 | # task-group join below is immediate. Cancelling here |
| 78 | # unconditionally would `coro.throw()` into this task, |
no outgoing calls