Build an ASGI scope/receive/send triple for a request to the SSE transport.
(
method: str, path: str, user: AuthenticatedUser | None, *, query_string: bytes = b"", body: bytes = b""
)
| 204 | |
| 205 | |
| 206 | def _sse_scope( |
| 207 | method: str, path: str, user: AuthenticatedUser | None, *, query_string: bytes = b"", body: bytes = b"" |
| 208 | ) -> tuple[Scope, Receive, Send, list[Message]]: |
| 209 | """Build an ASGI scope/receive/send triple for a request to the SSE transport.""" |
| 210 | scope: Scope = { |
| 211 | "type": "http", |
| 212 | "method": method, |
| 213 | "path": path, |
| 214 | "root_path": "", |
| 215 | "query_string": query_string, |
| 216 | "headers": [(b"content-type", b"application/json")], |
| 217 | } |
| 218 | if user is not None: |
| 219 | scope["user"] = user |
| 220 | sent: list[Message] = [] |
| 221 | |
| 222 | async def receive() -> Message: |
| 223 | return {"type": "http.request", "body": body, "more_body": False} |
| 224 | |
| 225 | async def send(message: Message) -> None: |
| 226 | sent.append(message) |
| 227 | |
| 228 | return scope, receive, send, sent |
| 229 | |
| 230 | |
| 231 | def _response_status(sent: list[Message]) -> int: |
no outgoing calls
no test coverage detected