MCPcopy
hub / github.com/modelcontextprotocol/python-sdk / sse_reader

Function sse_reader

src/mcp/client/sse.py:65–116  ·  view source on GitHub ↗
(task_status: TaskStatus[str] = anyio.TASK_STATUS_IGNORED)

Source from the content-addressed store, hash-verified

63 write_stream, write_stream_reader = create_context_streams[SessionMessage](0)
64
65 async def sse_reader(task_status: TaskStatus[str] = anyio.TASK_STATUS_IGNORED):
66 try:
67 async for sse in event_source.aiter_sse(): # pragma: no branch
68 logger.debug(f"Received SSE event: {sse.event}")
69 match sse.event:
70 case "endpoint":
71 endpoint_url = urljoin(url, sse.data)
72 logger.debug(f"Received endpoint URL: {endpoint_url}")
73
74 url_parsed = urlparse(url)
75 endpoint_parsed = urlparse(endpoint_url)
76 if ( # pragma: no cover
77 url_parsed.netloc != endpoint_parsed.netloc
78 or url_parsed.scheme != endpoint_parsed.scheme
79 ):
80 error_msg = ( # pragma: no cover
81 f"Endpoint origin does not match connection origin: {endpoint_url}"
82 )
83 logger.error(error_msg) # pragma: no cover
84 raise ValueError(error_msg) # pragma: no cover
85
86 if on_session_created:
87 session_id = _extract_session_id_from_endpoint(endpoint_url)
88 if session_id:
89 on_session_created(session_id)
90
91 task_status.started(endpoint_url)
92
93 case "message":
94 # Skip empty data (keep-alive pings)
95 if not sse.data:
96 continue
97 try:
98 message = types.jsonrpc_message_adapter.validate_json(sse.data, by_name=False)
99 logger.debug(f"Received server message: {message}")
100 except Exception as exc: # pragma: no cover
101 logger.exception("Error parsing server message") # pragma: no cover
102 await read_stream_writer.send(exc) # pragma: no cover
103 continue # pragma: no cover
104
105 session_message = SessionMessage(message)
106 await read_stream_writer.send(session_message)
107 case _: # pragma: no cover
108 logger.warning(f"Unknown SSE event: {sse.event}") # pragma: no cover
109 except SSEError as sse_exc: # pragma: lax no cover
110 logger.exception("Encountered SSE exception")
111 raise sse_exc
112 except Exception as exc: # pragma: lax no cover
113 logger.exception("Error in sse_reader")
114 await read_stream_writer.send(exc)
115 finally:
116 await read_stream_writer.aclose()
117
118 async def post_writer(endpoint_url: str):
119 try:

Callers

nothing calls this directly

Calls 7

SessionMessageClass · 0.90
debugMethod · 0.80
errorMethod · 0.80
warningMethod · 0.80
sendMethod · 0.45
acloseMethod · 0.45

Tested by

no test coverage detected