(scope: Scope, receive: Receive, send: Send)
| 923 | sse = SseServerTransport(message_path, security_settings=transport_security) |
| 924 | |
| 925 | async def handle_sse(scope: Scope, receive: Receive, send: Send): # pragma: no cover |
| 926 | # Add client ID from auth context into request context if available |
| 927 | |
| 928 | async with sse.connect_sse(scope, receive, send) as streams: |
| 929 | await self._lowlevel_server.run( |
| 930 | streams[0], streams[1], self._lowlevel_server.create_initialization_options() |
| 931 | ) |
| 932 | return Response() |
| 933 | |
| 934 | # Create routes |
| 935 | routes: list[Route | Mount] = [] |
nothing calls this directly
no test coverage detected