Establish the SSE session as `creator` and keep it open, as a server would.
()
| 292 | sender_user = _authenticated_user(*sender) if sender is not None else None |
| 293 | |
| 294 | async def hold_sse_connection() -> None: |
| 295 | """Establish the SSE session as `creator` and keep it open, as a server would.""" |
| 296 | scope, _, _, _ = _sse_scope("GET", "/sse", creator_user) |
| 297 | with anyio.fail_after(5): |
| 298 | async with transport.connect_sse(scope, get_receive, get_send) as (read_stream, write_stream): |
| 299 | async with read_stream, write_stream: # pragma: no branch |
| 300 | async for _ in read_stream: |
| 301 | pass |
| 302 | |
| 303 | async with anyio.create_task_group() as tg: |
| 304 | tg.start_soon(hold_sse_connection) |
nothing calls this directly
no test coverage detected