An httpx client whose requests are served in process by an SSE app with the given settings.
(security_settings: TransportSecuritySettings | None = None)
| 41 | |
| 42 | |
| 43 | def sse_security_client(security_settings: TransportSecuritySettings | None = None) -> httpx.AsyncClient: |
| 44 | """An httpx client whose requests are served in process by an SSE app with the given settings.""" |
| 45 | server = Server(SERVER_NAME) |
| 46 | sse_transport = SseServerTransport("/messages/", security_settings) |
| 47 | |
| 48 | async def handle_sse(request: Request) -> Response: |
| 49 | try: |
| 50 | async with sse_transport.connect_sse(request.scope, request.receive, request._send) as (read, write): |
| 51 | await server.run(read, write, server.create_initialization_options()) |
| 52 | except ValueError as e: |
| 53 | # Validation error was already handled inside connect_sse, which sent the rejection |
| 54 | # response itself; its non-empty body checkpoints, so the test reads the rejection |
| 55 | # status before the trailing Response() below sends a second response start. |
| 56 | logger.debug(f"SSE connection failed validation: {e}") |
| 57 | return Response() |
| 58 | |
| 59 | app = Starlette( |
| 60 | routes=[ |
| 61 | Route("/sse", endpoint=handle_sse), |
| 62 | Mount("/messages/", app=sse_transport.handle_post_message), |
| 63 | ] |
| 64 | ) |
| 65 | # The SSE GET runs until it observes a disconnect, so the bridge must let the application |
| 66 | # drain on close rather than cancelling it. |
| 67 | transport = StreamingASGITransport(app, cancel_on_close=False) |
| 68 | return httpx.AsyncClient(transport=transport, base_url=BASE_URL) |
| 69 | |
| 70 | |
| 71 | @pytest.mark.anyio |
no test coverage detected