(task_status: TaskStatus[str] = anyio.TASK_STATUS_IGNORED)
| 63 | write_stream, write_stream_reader = create_context_streams[SessionMessage](0) |
| 64 | |
| 65 | async def sse_reader(task_status: TaskStatus[str] = anyio.TASK_STATUS_IGNORED): |
| 66 | try: |
| 67 | async for sse in event_source.aiter_sse(): # pragma: no branch |
| 68 | logger.debug(f"Received SSE event: {sse.event}") |
| 69 | match sse.event: |
| 70 | case "endpoint": |
| 71 | endpoint_url = urljoin(url, sse.data) |
| 72 | logger.debug(f"Received endpoint URL: {endpoint_url}") |
| 73 | |
| 74 | url_parsed = urlparse(url) |
| 75 | endpoint_parsed = urlparse(endpoint_url) |
| 76 | if ( # pragma: no cover |
| 77 | url_parsed.netloc != endpoint_parsed.netloc |
| 78 | or url_parsed.scheme != endpoint_parsed.scheme |
| 79 | ): |
| 80 | error_msg = ( # pragma: no cover |
| 81 | f"Endpoint origin does not match connection origin: {endpoint_url}" |
| 82 | ) |
| 83 | logger.error(error_msg) # pragma: no cover |
| 84 | raise ValueError(error_msg) # pragma: no cover |
| 85 | |
| 86 | if on_session_created: |
| 87 | session_id = _extract_session_id_from_endpoint(endpoint_url) |
| 88 | if session_id: |
| 89 | on_session_created(session_id) |
| 90 | |
| 91 | task_status.started(endpoint_url) |
| 92 | |
| 93 | case "message": |
| 94 | # Skip empty data (keep-alive pings) |
| 95 | if not sse.data: |
| 96 | continue |
| 97 | try: |
| 98 | message = types.jsonrpc_message_adapter.validate_json(sse.data, by_name=False) |
| 99 | logger.debug(f"Received server message: {message}") |
| 100 | except Exception as exc: # pragma: no cover |
| 101 | logger.exception("Error parsing server message") # pragma: no cover |
| 102 | await read_stream_writer.send(exc) # pragma: no cover |
| 103 | continue # pragma: no cover |
| 104 | |
| 105 | session_message = SessionMessage(message) |
| 106 | await read_stream_writer.send(session_message) |
| 107 | case _: # pragma: no cover |
| 108 | logger.warning(f"Unknown SSE event: {sse.event}") # pragma: no cover |
| 109 | except SSEError as sse_exc: # pragma: lax no cover |
| 110 | logger.exception("Encountered SSE exception") |
| 111 | raise sse_exc |
| 112 | except Exception as exc: # pragma: lax no cover |
| 113 | logger.exception("Error in sse_reader") |
| 114 | await read_stream_writer.send(exc) |
| 115 | finally: |
| 116 | await read_stream_writer.aclose() |
| 117 | |
| 118 | async def post_writer(endpoint_url: str): |
| 119 | try: |
nothing calls this directly
no test coverage detected