MCPcopy
hub / github.com/python-trio/trio / test_basic_udp

Function test_basic_udp

src/trio/_tests/test_fakenet.py:26–58  ·  view source on GitHub ↗
()

Source from the content-addressed store, hash-verified

24
25
26async def test_basic_udp() -> None:
27 fn()
28 s1 = trio.socket.socket(type=trio.socket.SOCK_DGRAM)
29 s2 = trio.socket.socket(type=trio.socket.SOCK_DGRAM)
30
31 await s1.bind(("127.0.0.1", 0))
32 ip, port = s1.getsockname()
33 assert ip == "127.0.0.1"
34 assert port != 0
35
36 with pytest.raises(
37 OSError,
38 match=r"^\[\w+ \d+\] Invalid argument$",
39 ) as exc: # Cannot rebind.
40 await s1.bind(("192.0.2.1", 0))
41 assert exc.value.errno == errno.EINVAL
42
43 # Cannot bind multiple sockets to the same address
44 with pytest.raises(
45 OSError,
46 match=r"^\[\w+ \d+\] (Address (already )?in use|Unknown error)$",
47 ) as exc:
48 await s2.bind(("127.0.0.1", port))
49 assert exc.value.errno == errno.EADDRINUSE
50
51 await s2.sendto(b"xyz", s1.getsockname())
52 data, addr = await s1.recvfrom(10)
53 assert data == b"xyz"
54 assert addr == s2.getsockname()
55 await s1.sendto(b"abc", s2.getsockname())
56 data, addr = await s2.recvfrom(10)
57 assert data == b"abc"
58 assert addr == s1.getsockname()
59
60
61async def test_msg_trunc() -> None:

Callers

nothing calls this directly

Calls 6

fnFunction · 0.70
socketMethod · 0.45
bindMethod · 0.45
getsocknameMethod · 0.45
sendtoMethod · 0.45
recvfromMethod · 0.45

Tested by

no test coverage detected

Used in the wild real call sites across dependent graphs

searching dependent graphs…