(self)
| 64 | self.device.join(timeout=5) |
| 65 | |
| 66 | def test_reply(self): |
| 67 | alice, bob, mon = self.build_device() |
| 68 | alices = b"hello bob".split() |
| 69 | alice.send_multipart(alices) |
| 70 | bobs = self.recv_multipart(bob) |
| 71 | assert alices == bobs |
| 72 | bobs = b"hello alice".split() |
| 73 | bob.send_multipart(bobs) |
| 74 | alices = self.recv_multipart(alice) |
| 75 | assert alices == bobs |
| 76 | self.teardown_device() |
| 77 | |
| 78 | def test_queue(self): |
| 79 | alice, bob, mon = self.build_device() |
nothing calls this directly
no test coverage detected