POST a message to an SSE session as `user` and return the response status.
(transport: SseServerTransport, session_id: str, user: AuthenticatedUser | None)
| 234 | |
| 235 | |
| 236 | async def _post_message(transport: SseServerTransport, session_id: str, user: AuthenticatedUser | None) -> int: |
| 237 | """POST a message to an SSE session as `user` and return the response status.""" |
| 238 | body = b'{"jsonrpc": "2.0", "id": 1, "method": "ping", "params": null}' |
| 239 | scope, receive, send, sent = _sse_scope( |
| 240 | "POST", "/messages/", user, query_string=f"session_id={session_id}".encode(), body=body |
| 241 | ) |
| 242 | await transport.handle_post_message(scope, receive, send) |
| 243 | return _response_status(sent) |
| 244 | |
| 245 | |
| 246 | _Principal = tuple[str] | tuple[str, str] | tuple[str, str, str] |
no test coverage detected