(self)
| 19 | Poller = zmq.Poller |
| 20 | |
| 21 | def test_pair(self): |
| 22 | s1, s2 = self.create_bound_pair(zmq.PAIR, zmq.PAIR) |
| 23 | |
| 24 | # Sleep to allow sockets to connect. |
| 25 | wait() |
| 26 | |
| 27 | poller = self.Poller() |
| 28 | poller.register(s1, zmq.POLLIN | zmq.POLLOUT) |
| 29 | poller.register(s2, zmq.POLLIN | zmq.POLLOUT) |
| 30 | # Poll result should contain both sockets |
| 31 | socks = dict(poller.poll()) |
| 32 | # Now make sure that both are send ready. |
| 33 | assert socks[s1] == zmq.POLLOUT |
| 34 | assert socks[s2] == zmq.POLLOUT |
| 35 | # Now do a send on both, wait and test for zmq.POLLOUT|zmq.POLLIN |
| 36 | s1.send(b'msg1') |
| 37 | s2.send(b'msg2') |
| 38 | wait() |
| 39 | socks = dict(poller.poll()) |
| 40 | assert socks[s1] == zmq.POLLOUT | zmq.POLLIN |
| 41 | assert socks[s2] == zmq.POLLOUT | zmq.POLLIN |
| 42 | # Make sure that both are in POLLOUT after recv. |
| 43 | s1.recv() |
| 44 | s2.recv() |
| 45 | socks = dict(poller.poll()) |
| 46 | assert socks[s1] == zmq.POLLOUT |
| 47 | assert socks[s2] == zmq.POLLOUT |
| 48 | |
| 49 | poller.unregister(s1) |
| 50 | poller.unregister(s2) |
| 51 | |
| 52 | def test_reqrep(self): |
| 53 | s1, s2 = self.create_bound_pair(zmq.REP, zmq.REQ) |
nothing calls this directly
no test coverage detected