(driver: Driver, server_url: URL)
| 648 | indirect=True, |
| 649 | ) |
| 650 | async def test_websocket_client(driver: Driver, server_url: URL): |
| 651 | assert isinstance(driver, WebSocketClientMixin) |
| 652 | |
| 653 | request = Request("GET", server_url.with_scheme("ws")) |
| 654 | async with driver.websocket(request) as ws: |
| 655 | await ws.send("test") |
| 656 | assert await ws.receive() == "test" |
| 657 | |
| 658 | await ws.send(b"test") |
| 659 | assert await ws.receive() == b"test" |
| 660 | |
| 661 | await ws.send_text("test") |
| 662 | assert await ws.receive_text() == "test" |
| 663 | |
| 664 | await ws.send_bytes(b"test") |
| 665 | assert await ws.receive_bytes() == b"test" |
| 666 | |
| 667 | await ws.send("quit") |
| 668 | with pytest.raises(WebSocketClosed, match=r"code=1000"): |
| 669 | await ws.receive() |
| 670 | |
| 671 | await anyio.sleep(1) |
| 672 | |
| 673 | |
| 674 | @pytest.mark.anyio |
nothing calls this directly
no test coverage detected