MCPcopy
hub / github.com/modelcontextprotocol/python-sdk / post_writer

Function post_writer

src/mcp/client/sse.py:118–143  ·  view source on GitHub ↗
(endpoint_url: str)

Source from the content-addressed store, hash-verified

116 await read_stream_writer.aclose()
117
118 async def post_writer(endpoint_url: str):
119 try:
120 async with write_stream_reader, write_stream:
121
122 async def _send_message(session_message: SessionMessage) -> None:
123 logger.debug(f"Sending client message: {session_message}")
124 response = await client.post(
125 endpoint_url,
126 json=session_message.message.model_dump(
127 by_alias=True,
128 mode="json",
129 exclude_unset=True,
130 ),
131 )
132 response.raise_for_status()
133 logger.debug(f"Client message sent successfully: {response.status_code}")
134
135 async for session_message in write_stream_reader:
136 sender_ctx = write_stream_reader.last_context
137 if sender_ctx is not None:
138 async with anyio.create_task_group() as tg:
139 sender_ctx.run(tg.start_soon, _send_message, session_message)
140 else:
141 await _send_message(session_message) # pragma: no cover
142 except Exception: # pragma: lax no cover
143 logger.exception("Error in post_writer")
144
145 # On Python 3.14, coverage.py reports a phantom branch arc on this
146 # line (->yield) when nested two async-with levels deep. The branch

Callers

nothing calls this directly

Calls 2

_send_messageFunction · 0.85
runMethod · 0.45

Tested by

no test coverage detected