MCPcopy
hub / github.com/modelcontextprotocol/python-sdk / hold_sse_connection

Function hold_sse_connection

tests/server/test_sse_security.py:294–301  ·  view source on GitHub ↗

Establish the SSE session as `creator` and keep it open, as a server would.

()

Source from the content-addressed store, hash-verified

292 sender_user = _authenticated_user(*sender) if sender is not None else None
293
294 async def hold_sse_connection() -> None:
295 """Establish the SSE session as `creator` and keep it open, as a server would."""
296 scope, _, _, _ = _sse_scope("GET", "/sse", creator_user)
297 with anyio.fail_after(5):
298 async with transport.connect_sse(scope, get_receive, get_send) as (read_stream, write_stream):
299 async with read_stream, write_stream: # pragma: no branch
300 async for _ in read_stream:
301 pass
302
303 async with anyio.create_task_group() as tg:
304 tg.start_soon(hold_sse_connection)

Callers

nothing calls this directly

Calls 2

_sse_scopeFunction · 0.85
connect_sseMethod · 0.80

Tested by

no test coverage detected