MCPcopy Index your code
hub / github.com/modelcontextprotocol/python-sdk / InMemoryTransport

Class InMemoryTransport

src/mcp/client/_memory.py:21–109  ·  view source on GitHub ↗

In-memory transport for testing MCP servers without network overhead. This transport starts the server in a background task and provides streams for client-side communication. The server is automatically stopped when the context manager exits.

Source from the content-addressed store, hash-verified

19
20
21class InMemoryTransport:
22 """In-memory transport for testing MCP servers without network overhead.
23
24 This transport starts the server in a background task and provides
25 streams for client-side communication. The server is automatically
26 stopped when the context manager exits.
27 """
28
29 def __init__(self, server: Server[Any] | MCPServer, *, raise_exceptions: bool = False) -> None:
30 """Initialize the in-memory transport.
31
32 Args:
33 server: The MCP server to connect to (Server or MCPServer instance)
34 raise_exceptions: Whether to raise exceptions from the server
35 """
36 self._server = server
37 self._raise_exceptions = raise_exceptions
38 self._cm: AbstractAsyncContextManager[TransportStreams] | None = None
39
40 @asynccontextmanager
41 async def _connect(self) -> AsyncIterator[TransportStreams]:
42 """Connect to the server and yield streams for communication."""
43 # Unwrap MCPServer to get underlying Server
44 if isinstance(self._server, MCPServer):
45 # TODO(Marcelo): Make `lowlevel_server` public.
46 actual_server: Server[Any] = self._server._lowlevel_server # type: ignore[reportPrivateUsage]
47 else:
48 actual_server = self._server
49
50 async with create_client_server_memory_streams() as (client_streams, server_streams):
51 client_read, client_write = client_streams
52 server_read, server_write = server_streams
53
54 server_done = anyio.Event()
55
56 async def _run_server() -> None:
57 try:
58 await actual_server.run(
59 server_read,
60 server_write,
61 actual_server.create_initialization_options(),
62 raise_exceptions=self._raise_exceptions,
63 )
64 finally:
65 server_done.set()
66
67 async with anyio.create_task_group() as tg:
68 tg.start_soon(_run_server)
69
70 try:
71 yield client_read, client_write
72 finally:
73 # EOF the server (and our own read side) instead of
74 # cancelling outright. The dispatcher's run() cancels its
75 # own in-flight handlers on read-stream EOF, so for a
76 # well-behaved server the task exits naturally and the
77 # task-group join below is immediate. Cancelling here
78 # unconditionally would `coro.throw()` into this task,

Calls

no outgoing calls