MCPcopy
hub / github.com/zeromq/pyzmq / test_monitor

Function test_monitor

tests/test_monitor.py:22–55  ·  view source on GitHub ↗

Test monitoring interface for sockets.

(context, socket)

Source from the content-addressed store, hash-verified

20
21
22async def test_monitor(context, socket):
23 """Test monitoring interface for sockets."""
24 s_rep = socket(zmq.REP)
25 s_req = socket(zmq.REQ)
26 s_req.bind("tcp://127.0.0.1:6666")
27 # try monitoring the REP socket
28 s_rep.monitor(
29 "inproc://monitor.rep",
30 zmq.EVENT_CONNECT_DELAYED | zmq.EVENT_CONNECTED | zmq.EVENT_MONITOR_STOPPED,
31 )
32 # create listening socket for monitor
33 s_event = socket(zmq.PAIR)
34 s_event.connect("inproc://monitor.rep")
35 s_event.linger = 0
36 # test receive event for connect event
37 s_rep.connect("tcp://127.0.0.1:6666")
38 m = recv_monitor_message(s_event)
39 if isinstance(context, zmq.asyncio.Context):
40 m = await m
41 if m['event'] == zmq.EVENT_CONNECT_DELAYED:
42 assert m['endpoint'] == b"tcp://127.0.0.1:6666"
43 # test receive event for connected event
44 m = recv_monitor_message(s_event)
45 if isinstance(context, zmq.asyncio.Context):
46 m = await m
47 assert m['event'] == zmq.EVENT_CONNECTED
48 assert m['endpoint'] == b"tcp://127.0.0.1:6666"
49
50 # test monitor can be disabled.
51 s_rep.disable_monitor()
52 m = recv_monitor_message(s_event)
53 if isinstance(context, zmq.asyncio.Context):
54 m = await m
55 assert m['event'] == zmq.EVENT_MONITOR_STOPPED
56
57
58async def test_monitor_repeat(context, socket, sockets):

Callers

nothing calls this directly

Calls 6

recv_monitor_messageFunction · 0.90
disable_monitorMethod · 0.80
socketFunction · 0.70
bindMethod · 0.45
monitorMethod · 0.45
connectMethod · 0.45

Tested by

no test coverage detected

Used in the wild real call sites across dependent graphs

searching dependent graphs…