The SSE GET responds 200 with an event-stream content type, announcing the session endpoint as its first event.
()
| 110 | |
| 111 | @pytest.mark.anyio |
| 112 | async def test_raw_sse_connection() -> None: |
| 113 | """The SSE GET responds 200 with an event-stream content type, announcing the session |
| 114 | endpoint as its first event.""" |
| 115 | http_client = httpx.AsyncClient( |
| 116 | transport=StreamingASGITransport(make_server_app(), cancel_on_close=False), base_url=BASE_URL |
| 117 | ) |
| 118 | |
| 119 | with anyio.fail_after(5): |
| 120 | async with http_client, http_client.stream("GET", "/sse") as response: |
| 121 | assert response.status_code == 200 |
| 122 | assert response.headers["content-type"] == "text/event-stream; charset=utf-8" |
| 123 | |
| 124 | lines = response.aiter_lines() |
| 125 | assert await anext(lines) == "event: endpoint" |
| 126 | assert (await anext(lines)).startswith("data: /messages/?session_id=") |
| 127 | |
| 128 | |
| 129 | @pytest.mark.anyio |
nothing calls this directly
no test coverage detected