MCPcopy
hub / github.com/modelcontextprotocol/python-sdk / _sse_scope

Function _sse_scope

tests/server/test_sse_security.py:206–228  ·  view source on GitHub ↗

Build an ASGI scope/receive/send triple for a request to the SSE transport.

(
    method: str, path: str, user: AuthenticatedUser | None, *, query_string: bytes = b"", body: bytes = b""
)

Source from the content-addressed store, hash-verified

204
205
206def _sse_scope(
207 method: str, path: str, user: AuthenticatedUser | None, *, query_string: bytes = b"", body: bytes = b""
208) -> tuple[Scope, Receive, Send, list[Message]]:
209 """Build an ASGI scope/receive/send triple for a request to the SSE transport."""
210 scope: Scope = {
211 "type": "http",
212 "method": method,
213 "path": path,
214 "root_path": "",
215 "query_string": query_string,
216 "headers": [(b"content-type", b"application/json")],
217 }
218 if user is not None:
219 scope["user"] = user
220 sent: list[Message] = []
221
222 async def receive() -> Message:
223 return {"type": "http.request", "body": body, "more_body": False}
224
225 async def send(message: Message) -> None:
226 sent.append(message)
227
228 return scope, receive, send, sent
229
230
231def _response_status(sent: list[Message]) -> int:

Calls

no outgoing calls

Tested by

no test coverage detected