StreamableHTTP client transport implementation.
| 71 | |
| 72 | |
| 73 | class StreamableHTTPTransport: |
| 74 | """StreamableHTTP client transport implementation.""" |
| 75 | |
| 76 | def __init__(self, url: str) -> None: |
| 77 | """Initialize the StreamableHTTP transport. |
| 78 | |
| 79 | Args: |
| 80 | url: The endpoint URL. |
| 81 | """ |
| 82 | self.url = url |
| 83 | self.session_id: str | None = None |
| 84 | # Captured from each stamped POST's metadata. Reused on outbound HTTP that carries |
| 85 | # no per-message header (transport-internal GET/DELETE, and dispatcher-written |
| 86 | # response/error/cancel POSTs that bypass the session's stamp). Cleared when an |
| 87 | # `initialize` POST goes out so a probe-stamped value cannot leak onto the handshake. |
| 88 | self._protocol_version_header: str | None = None |
| 89 | |
| 90 | def _prepare_headers(self) -> dict[str, str]: |
| 91 | """Build MCP-specific request headers for any outbound HTTP request. |
| 92 | |
| 93 | These are merged with the ``httpx.AsyncClient`` defaults (these take |
| 94 | precedence). The cached ``MCP-Protocol-Version`` is included whenever |
| 95 | present so messages that don't pass through the session's stamp — |
| 96 | response/error/cancel POSTs, transport-internal GET/DELETE — still |
| 97 | carry the negotiated version. Per-message headers are layered on top |
| 98 | by the caller. |
| 99 | """ |
| 100 | headers: dict[str, str] = { |
| 101 | "accept": "application/json, text/event-stream", |
| 102 | "content-type": "application/json", |
| 103 | } |
| 104 | if self.session_id: |
| 105 | headers[MCP_SESSION_ID] = self.session_id |
| 106 | if self._protocol_version_header: |
| 107 | headers[MCP_PROTOCOL_VERSION_HEADER] = self._protocol_version_header |
| 108 | return headers |
| 109 | |
| 110 | def _is_initialization_request(self, message: JSONRPCMessage) -> bool: |
| 111 | """Check if the message is an initialization request.""" |
| 112 | return isinstance(message, JSONRPCRequest) and message.method == "initialize" |
| 113 | |
| 114 | def _is_initialized_notification(self, message: JSONRPCMessage) -> bool: |
| 115 | """Check if the message is an initialized notification.""" |
| 116 | return isinstance(message, JSONRPCNotification) and message.method == "notifications/initialized" |
| 117 | |
| 118 | def _maybe_extract_session_id_from_response(self, response: httpx.Response) -> None: |
| 119 | """Extract and store session ID from response headers.""" |
| 120 | new_session_id = response.headers.get(MCP_SESSION_ID) |
| 121 | if new_session_id: |
| 122 | self.session_id = new_session_id |
| 123 | logger.info(f"Received session ID: {self.session_id}") |
| 124 | |
| 125 | async def _handle_sse_event( |
| 126 | self, |
| 127 | sse: ServerSentEvent, |
| 128 | read_stream_writer: StreamWriter, |
| 129 | original_request_id: RequestId | None = None, |
| 130 | resumption_callback: Callable[[str], Awaitable[None]] | None = None, |
no outgoing calls