MCPcopy
hub / github.com/modelcontextprotocol/python-sdk / test_raw_sse_connection

Function test_raw_sse_connection

tests/shared/test_sse.py:112–126  ·  view source on GitHub ↗

The SSE GET responds 200 with an event-stream content type, announcing the session endpoint as its first event.

()

Source from the content-addressed store, hash-verified

110
111@pytest.mark.anyio
112async def test_raw_sse_connection() -> None:
113 """The SSE GET responds 200 with an event-stream content type, announcing the session
114 endpoint as its first event."""
115 http_client = httpx.AsyncClient(
116 transport=StreamingASGITransport(make_server_app(), cancel_on_close=False), base_url=BASE_URL
117 )
118
119 with anyio.fail_after(5):
120 async with http_client, http_client.stream("GET", "/sse") as response:
121 assert response.status_code == 200
122 assert response.headers["content-type"] == "text/event-stream; charset=utf-8"
123
124 lines = response.aiter_lines()
125 assert await anext(lines) == "event: endpoint"
126 assert (await anext(lines)).startswith("data: /messages/?session_id=")
127
128
129@pytest.mark.anyio

Callers

nothing calls this directly

Calls 2

make_server_appFunction · 0.85

Tested by

no test coverage detected