MCPcopy
hub / github.com/zeromq/pyzmq / test_queue

Method test_queue

tests/test_monqueue.py:78–96  ·  view source on GitHub ↗
(self)

Source from the content-addressed store, hash-verified

76 self.teardown_device()
77
78 def test_queue(self):
79 alice, bob, mon = self.build_device()
80 alices = b"hello bob".split()
81 alice.send_multipart(alices)
82 alices2 = b"hello again".split()
83 alice.send_multipart(alices2)
84 alices3 = b"hello again and again".split()
85 alice.send_multipart(alices3)
86 bobs = self.recv_multipart(bob)
87 assert alices == bobs
88 bobs = self.recv_multipart(bob)
89 assert alices2 == bobs
90 bobs = self.recv_multipart(bob)
91 assert alices3 == bobs
92 bobs = b"hello alice".split()
93 bob.send_multipart(bobs)
94 alices = self.recv_multipart(alice)
95 assert alices == bobs
96 self.teardown_device()
97
98 def test_monitor(self):
99 alice, bob, mon = self.build_device()

Callers

nothing calls this directly

Calls 4

build_deviceMethod · 0.95
teardown_deviceMethod · 0.95
send_multipartMethod · 0.45
recv_multipartMethod · 0.45

Tested by

no test coverage detected