MCPcopy
hub / github.com/modelcontextprotocol/python-sdk / unicode_session

Function unicode_session

tests/client/test_http_unicode.py:99–124  ·  view source on GitHub ↗

Yield an initialized ClientSession speaking streamable HTTP (SSE responses) to the Unicode test server, entirely in process.

()

Source from the content-addressed store, hash-verified

97
98@asynccontextmanager
99async def unicode_session() -> AsyncIterator[ClientSession]:
100 """Yield an initialized ClientSession speaking streamable HTTP (SSE responses) to the
101 Unicode test server, entirely in process."""
102 server = Server(
103 name="unicode_test_server",
104 on_list_tools=handle_list_tools,
105 on_call_tool=handle_call_tool,
106 on_list_prompts=handle_list_prompts,
107 on_get_prompt=handle_get_prompt,
108 )
109 # SSE response mode, so Unicode rides the SSE event encoding rather than a plain JSON body.
110 session_manager = StreamableHTTPSessionManager(app=server, json_response=False)
111 app = Starlette(routes=[Mount("/mcp", app=session_manager.handle_request)])
112
113 async with (
114 session_manager.run(),
115 # follow_redirects matches the SDK's own client factory; Starlette's Mount 307-redirects
116 # the bare /mcp path to /mcp/.
117 httpx.AsyncClient(
118 transport=StreamingASGITransport(app), base_url=BASE_URL, follow_redirects=True
119 ) as http_client,
120 streamable_http_client(f"{BASE_URL}/mcp", http_client=http_client) as (read_stream, write_stream),
121 ClientSession(read_stream, write_stream) as session,
122 ):
123 await session.initialize()
124 yield session
125
126
127@pytest.mark.anyio

Calls 7

runMethod · 0.95
ServerClass · 0.90
streamable_http_clientFunction · 0.90
ClientSessionClass · 0.90
initializeMethod · 0.45

Tested by

no test coverage detected