MCPcopy
hub / github.com/nonebot/nonebot2 / _handle_ws

Function _handle_ws

tests/test_driver.py:122–141  ·  view source on GitHub ↗
(ws: WebSocket)

Source from the content-addressed store, hash-verified

120 assert isinstance(driver, ASGIMixin)
121
122 async def _handle_ws(ws: WebSocket) -> None:
123 await ws.accept()
124 data = await ws.receive()
125 assert data == "ping"
126 await ws.send("pong")
127
128 data = await ws.receive()
129 assert data == b"ping"
130 await ws.send(b"pong")
131
132 data = await ws.receive_text()
133 assert data == "ping"
134 await ws.send("pong")
135
136 data = await ws.receive_bytes()
137 assert data == b"ping"
138 await ws.send(b"pong")
139
140 with pytest.raises(WebSocketClosed, match=r"code=1000"):
141 await ws.receive()
142
143 ws_setup = WebSocketServerSetup(URL("/ws_test"), "ws_test", _handle_ws)
144 driver.setup_websocket_server(ws_setup)

Callers

nothing calls this directly

Calls 8

setMethod · 0.80
waitMethod · 0.80
acceptMethod · 0.45
receiveMethod · 0.45
sendMethod · 0.45
receive_textMethod · 0.45
receive_bytesMethod · 0.45
closeMethod · 0.45

Tested by

no test coverage detected