(ws: WebSocket)
| 120 | assert isinstance(driver, ASGIMixin) |
| 121 | |
| 122 | async def _handle_ws(ws: WebSocket) -> None: |
| 123 | await ws.accept() |
| 124 | data = await ws.receive() |
| 125 | assert data == "ping" |
| 126 | await ws.send("pong") |
| 127 | |
| 128 | data = await ws.receive() |
| 129 | assert data == b"ping" |
| 130 | await ws.send(b"pong") |
| 131 | |
| 132 | data = await ws.receive_text() |
| 133 | assert data == "ping" |
| 134 | await ws.send("pong") |
| 135 | |
| 136 | data = await ws.receive_bytes() |
| 137 | assert data == b"ping" |
| 138 | await ws.send(b"pong") |
| 139 | |
| 140 | with pytest.raises(WebSocketClosed, match=r"code=1000"): |
| 141 | await ws.receive() |
| 142 | |
| 143 | ws_setup = WebSocketServerSetup(URL("/ws_test"), "ws_test", _handle_ws) |
| 144 | driver.setup_websocket_server(ws_setup) |
nothing calls this directly
no test coverage detected