MCPcopy
hub / github.com/zeromq/pyzmq / test_monitor

Method test_monitor

tests/test_monqueue.py:98–124  ·  view source on GitHub ↗
(self)

Source from the content-addressed store, hash-verified

96 self.teardown_device()
97
98 def test_monitor(self):
99 alice, bob, mon = self.build_device()
100 alices = b"hello bob".split()
101 alice.send_multipart(alices)
102 alices2 = b"hello again".split()
103 alice.send_multipart(alices2)
104 alices3 = b"hello again and again".split()
105 alice.send_multipart(alices3)
106 bobs = self.recv_multipart(bob)
107 assert alices == bobs
108 mons = self.recv_multipart(mon)
109 assert [b'in'] + bobs == mons
110 bobs = self.recv_multipart(bob)
111 assert alices2 == bobs
112 bobs = self.recv_multipart(bob)
113 assert alices3 == bobs
114 mons = self.recv_multipart(mon)
115 assert [b'in'] + alices2 == mons
116 bobs = b"hello alice".split()
117 bob.send_multipart(bobs)
118 alices = self.recv_multipart(alice)
119 assert alices == bobs
120 mons = self.recv_multipart(mon)
121 assert [b'in'] + alices3 == mons
122 mons = self.recv_multipart(mon)
123 assert [b'out'] + bobs == mons
124 self.teardown_device()
125
126 def test_prefix(self):
127 alice, bob, mon = self.build_device(b"", b'foo', b'bar')

Callers

nothing calls this directly

Calls 4

build_deviceMethod · 0.95
teardown_deviceMethod · 0.95
send_multipartMethod · 0.45
recv_multipartMethod · 0.45

Tested by

no test coverage detected