MCPcopy
hub / github.com/modelcontextprotocol/python-sdk / StreamableHTTPTransport

Class StreamableHTTPTransport

src/mcp/client/streamable_http.py:73–520  ·  view source on GitHub ↗

StreamableHTTP client transport implementation.

Source from the content-addressed store, hash-verified

71
72
73class StreamableHTTPTransport:
74 """StreamableHTTP client transport implementation."""
75
76 def __init__(self, url: str) -> None:
77 """Initialize the StreamableHTTP transport.
78
79 Args:
80 url: The endpoint URL.
81 """
82 self.url = url
83 self.session_id: str | None = None
84 # Captured from each stamped POST's metadata. Reused on outbound HTTP that carries
85 # no per-message header (transport-internal GET/DELETE, and dispatcher-written
86 # response/error/cancel POSTs that bypass the session's stamp). Cleared when an
87 # `initialize` POST goes out so a probe-stamped value cannot leak onto the handshake.
88 self._protocol_version_header: str | None = None
89
90 def _prepare_headers(self) -> dict[str, str]:
91 """Build MCP-specific request headers for any outbound HTTP request.
92
93 These are merged with the ``httpx.AsyncClient`` defaults (these take
94 precedence). The cached ``MCP-Protocol-Version`` is included whenever
95 present so messages that don't pass through the session's stamp —
96 response/error/cancel POSTs, transport-internal GET/DELETE — still
97 carry the negotiated version. Per-message headers are layered on top
98 by the caller.
99 """
100 headers: dict[str, str] = {
101 "accept": "application/json, text/event-stream",
102 "content-type": "application/json",
103 }
104 if self.session_id:
105 headers[MCP_SESSION_ID] = self.session_id
106 if self._protocol_version_header:
107 headers[MCP_PROTOCOL_VERSION_HEADER] = self._protocol_version_header
108 return headers
109
110 def _is_initialization_request(self, message: JSONRPCMessage) -> bool:
111 """Check if the message is an initialization request."""
112 return isinstance(message, JSONRPCRequest) and message.method == "initialize"
113
114 def _is_initialized_notification(self, message: JSONRPCMessage) -> bool:
115 """Check if the message is an initialized notification."""
116 return isinstance(message, JSONRPCNotification) and message.method == "notifications/initialized"
117
118 def _maybe_extract_session_id_from_response(self, response: httpx.Response) -> None:
119 """Extract and store session ID from response headers."""
120 new_session_id = response.headers.get(MCP_SESSION_ID)
121 if new_session_id:
122 self.session_id = new_session_id
123 logger.info(f"Received session ID: {self.session_id}")
124
125 async def _handle_sse_event(
126 self,
127 sse: ServerSentEvent,
128 read_stream_writer: StreamWriter,
129 original_request_id: RequestId | None = None,
130 resumption_callback: Callable[[str], Awaitable[None]] | None = None,

Callers 2

streamable_http_clientFunction · 0.85

Calls

no outgoing calls

Tested by 1