Server transport for stdio: this communicates with an MCP client by reading from the current process' stdin and writing to stdout.
(stdin: anyio.AsyncFile[str] | None = None, stdout: anyio.AsyncFile[str] | None = None)
| 31 | |
| 32 | @asynccontextmanager |
| 33 | async def stdio_server(stdin: anyio.AsyncFile[str] | None = None, stdout: anyio.AsyncFile[str] | None = None): |
| 34 | """Server transport for stdio: this communicates with an MCP client by reading |
| 35 | from the current process' stdin and writing to stdout. |
| 36 | """ |
| 37 | # Purposely not using context managers for these, as we don't want to close |
| 38 | # standard process handles. Encoding of stdin/stdout as text streams on |
| 39 | # python is platform-dependent (Windows is particularly problematic), so we |
| 40 | # re-wrap the underlying binary stream to ensure UTF-8. |
| 41 | if not stdin: |
| 42 | stdin = anyio.wrap_file(TextIOWrapper(sys.stdin.buffer, encoding="utf-8", errors="replace")) |
| 43 | if not stdout: |
| 44 | stdout = anyio.wrap_file(TextIOWrapper(sys.stdout.buffer, encoding="utf-8")) |
| 45 | |
| 46 | read_stream_writer, read_stream = create_context_streams[SessionMessage | Exception](0) |
| 47 | write_stream, write_stream_reader = create_context_streams[SessionMessage](0) |
| 48 | |
| 49 | async def stdin_reader(): |
| 50 | try: |
| 51 | async with read_stream_writer: |
| 52 | async for line in stdin: |
| 53 | try: |
| 54 | message = types.jsonrpc_message_adapter.validate_json(line, by_name=False) |
| 55 | except Exception as exc: |
| 56 | await read_stream_writer.send(exc) |
| 57 | continue |
| 58 | |
| 59 | session_message = SessionMessage(message) |
| 60 | await read_stream_writer.send(session_message) |
| 61 | except anyio.ClosedResourceError: # pragma: no cover |
| 62 | await anyio.lowlevel.checkpoint() |
| 63 | |
| 64 | async def stdout_writer(): |
| 65 | try: |
| 66 | async with write_stream_reader: |
| 67 | async for session_message in write_stream_reader: |
| 68 | json = session_message.message.model_dump_json(by_alias=True, exclude_unset=True) |
| 69 | await stdout.write(json + "\n") |
| 70 | await stdout.flush() |
| 71 | except anyio.ClosedResourceError: # pragma: no cover |
| 72 | await anyio.lowlevel.checkpoint() |
| 73 | |
| 74 | async with anyio.create_task_group() as tg: |
| 75 | tg.start_soon(stdin_reader) |
| 76 | tg.start_soon(stdout_writer) |
| 77 | yield read_stream, write_stream |
no outgoing calls