MCPcopy
hub / github.com/zeromq/pyzmq / test_pair

Method test_pair

tests/test_poll.py:21–50  ·  view source on GitHub ↗
(self)

Source from the content-addressed store, hash-verified

19 Poller = zmq.Poller
20
21 def test_pair(self):
22 s1, s2 = self.create_bound_pair(zmq.PAIR, zmq.PAIR)
23
24 # Sleep to allow sockets to connect.
25 wait()
26
27 poller = self.Poller()
28 poller.register(s1, zmq.POLLIN | zmq.POLLOUT)
29 poller.register(s2, zmq.POLLIN | zmq.POLLOUT)
30 # Poll result should contain both sockets
31 socks = dict(poller.poll())
32 # Now make sure that both are send ready.
33 assert socks[s1] == zmq.POLLOUT
34 assert socks[s2] == zmq.POLLOUT
35 # Now do a send on both, wait and test for zmq.POLLOUT|zmq.POLLIN
36 s1.send(b'msg1')
37 s2.send(b'msg2')
38 wait()
39 socks = dict(poller.poll())
40 assert socks[s1] == zmq.POLLOUT | zmq.POLLIN
41 assert socks[s2] == zmq.POLLOUT | zmq.POLLIN
42 # Make sure that both are in POLLOUT after recv.
43 s1.recv()
44 s2.recv()
45 socks = dict(poller.poll())
46 assert socks[s1] == zmq.POLLOUT
47 assert socks[s2] == zmq.POLLOUT
48
49 poller.unregister(s1)
50 poller.unregister(s2)
51
52 def test_reqrep(self):
53 s1, s2 = self.create_bound_pair(zmq.REP, zmq.REQ)

Callers

nothing calls this directly

Calls 7

registerMethod · 0.95
pollMethod · 0.95
unregisterMethod · 0.95
waitFunction · 0.85
create_bound_pairMethod · 0.45
sendMethod · 0.45
recvMethod · 0.45

Tested by

no test coverage detected