MCPcopy
hub / github.com/modelcontextprotocol/python-sdk / sse_writer

Method sse_writer

src/mcp/server/sse.py:165–178  ·  view source on GitHub ↗
()

Source from the content-addressed store, hash-verified

163 sse_stream_writer, sse_stream_reader = anyio.create_memory_object_stream[dict[str, Any]](0)
164
165 async def sse_writer():
166 logger.debug("Starting SSE writer")
167 async with sse_stream_writer, write_stream_reader:
168 await sse_stream_writer.send({"event": "endpoint", "data": client_post_uri_data})
169 logger.debug(f"Sent endpoint event: {client_post_uri_data}")
170
171 async for session_message in write_stream_reader:
172 logger.debug(f"Sending message via SSE: {session_message}")
173 await sse_stream_writer.send(
174 {
175 "event": "message",
176 "data": session_message.message.model_dump_json(by_alias=True, exclude_unset=True),
177 }
178 )
179
180 try:
181 async with anyio.create_task_group() as tg:

Callers

nothing calls this directly

Calls 2

debugMethod · 0.80
sendMethod · 0.45

Tested by

no test coverage detected