| 24 | |
| 25 | |
| 26 | async def test_basic_udp() -> None: |
| 27 | fn() |
| 28 | s1 = trio.socket.socket(type=trio.socket.SOCK_DGRAM) |
| 29 | s2 = trio.socket.socket(type=trio.socket.SOCK_DGRAM) |
| 30 | |
| 31 | await s1.bind(("127.0.0.1", 0)) |
| 32 | ip, port = s1.getsockname() |
| 33 | assert ip == "127.0.0.1" |
| 34 | assert port != 0 |
| 35 | |
| 36 | with pytest.raises( |
| 37 | OSError, |
| 38 | match=r"^\[\w+ \d+\] Invalid argument$", |
| 39 | ) as exc: # Cannot rebind. |
| 40 | await s1.bind(("192.0.2.1", 0)) |
| 41 | assert exc.value.errno == errno.EINVAL |
| 42 | |
| 43 | # Cannot bind multiple sockets to the same address |
| 44 | with pytest.raises( |
| 45 | OSError, |
| 46 | match=r"^\[\w+ \d+\] (Address (already )?in use|Unknown error)$", |
| 47 | ) as exc: |
| 48 | await s2.bind(("127.0.0.1", port)) |
| 49 | assert exc.value.errno == errno.EADDRINUSE |
| 50 | |
| 51 | await s2.sendto(b"xyz", s1.getsockname()) |
| 52 | data, addr = await s1.recvfrom(10) |
| 53 | assert data == b"xyz" |
| 54 | assert addr == s2.getsockname() |
| 55 | await s1.sendto(b"abc", s2.getsockname()) |
| 56 | data, addr = await s2.recvfrom(10) |
| 57 | assert data == b"abc" |
| 58 | assert addr == s1.getsockname() |
| 59 | |
| 60 | |
| 61 | async def test_msg_trunc() -> None: |