MCPcopy
hub / github.com/modelcontextprotocol/python-sdk / handle_sse

Function handle_sse

tests/server/test_sse_security.py:48–57  ·  view source on GitHub ↗
(request: Request)

Source from the content-addressed store, hash-verified

46 sse_transport = SseServerTransport("/messages/", security_settings)
47
48 async def handle_sse(request: Request) -> Response:
49 try:
50 async with sse_transport.connect_sse(request.scope, request.receive, request._send) as (read, write):
51 await server.run(read, write, server.create_initialization_options())
52 except ValueError as e:
53 # Validation error was already handled inside connect_sse, which sent the rejection
54 # response itself; its non-empty body checkpoints, so the test reads the rejection
55 # status before the trailing Response() below sends a second response start.
56 logger.debug(f"SSE connection failed validation: {e}")
57 return Response()
58
59 app = Starlette(
60 routes=[

Callers

nothing calls this directly

Calls 4

connect_sseMethod · 0.80
debugMethod · 0.80
runMethod · 0.45

Tested by

no test coverage detected