Test monitoring interface for sockets.
(context, socket)
| 20 | |
| 21 | |
| 22 | async def test_monitor(context, socket): |
| 23 | """Test monitoring interface for sockets.""" |
| 24 | s_rep = socket(zmq.REP) |
| 25 | s_req = socket(zmq.REQ) |
| 26 | s_req.bind("tcp://127.0.0.1:6666") |
| 27 | # try monitoring the REP socket |
| 28 | s_rep.monitor( |
| 29 | "inproc://monitor.rep", |
| 30 | zmq.EVENT_CONNECT_DELAYED | zmq.EVENT_CONNECTED | zmq.EVENT_MONITOR_STOPPED, |
| 31 | ) |
| 32 | # create listening socket for monitor |
| 33 | s_event = socket(zmq.PAIR) |
| 34 | s_event.connect("inproc://monitor.rep") |
| 35 | s_event.linger = 0 |
| 36 | # test receive event for connect event |
| 37 | s_rep.connect("tcp://127.0.0.1:6666") |
| 38 | m = recv_monitor_message(s_event) |
| 39 | if isinstance(context, zmq.asyncio.Context): |
| 40 | m = await m |
| 41 | if m['event'] == zmq.EVENT_CONNECT_DELAYED: |
| 42 | assert m['endpoint'] == b"tcp://127.0.0.1:6666" |
| 43 | # test receive event for connected event |
| 44 | m = recv_monitor_message(s_event) |
| 45 | if isinstance(context, zmq.asyncio.Context): |
| 46 | m = await m |
| 47 | assert m['event'] == zmq.EVENT_CONNECTED |
| 48 | assert m['endpoint'] == b"tcp://127.0.0.1:6666" |
| 49 | |
| 50 | # test monitor can be disabled. |
| 51 | s_rep.disable_monitor() |
| 52 | m = recv_monitor_message(s_event) |
| 53 | if isinstance(context, zmq.asyncio.Context): |
| 54 | m = await m |
| 55 | assert m['event'] == zmq.EVENT_MONITOR_STOPPED |
| 56 | |
| 57 | |
| 58 | async def test_monitor_repeat(context, socket, sockets): |
nothing calls this directly
no test coverage detected
searching dependent graphs…