MCPcopy
hub / github.com/zeromq/pyzmq / test_proxy

Method test_proxy

tests/test_device.py:108–128  ·  view source on GitHub ↗
(self)

Source from the content-addressed store, hash-verified

106 self.fail('Should have failed')
107
108 def test_proxy(self):
109 if zmq.zmq_version_info() < (3, 2):
110 raise SkipTest("Proxies only in libzmq >= 3")
111 dev = devices.ThreadProxy(zmq.PULL, zmq.PUSH, zmq.PUSH)
112 iface = 'tcp://127.0.0.1'
113 port = dev.bind_in_to_random_port(iface)
114 port2 = dev.bind_out_to_random_port(iface)
115 port3 = dev.bind_mon_to_random_port(iface)
116 dev.start()
117 time.sleep(0.25)
118 msg = b'hello'
119 push = self.context.socket(zmq.PUSH)
120 push.connect(f"{iface}:{port}")
121 pull = self.context.socket(zmq.PULL)
122 pull.connect(f"{iface}:{port2}")
123 mon = self.context.socket(zmq.PULL)
124 mon.connect(f"{iface}:{port3}")
125 push.send(msg)
126 self.sockets.extend([push, pull, mon])
127 assert msg == self.recv(pull)
128 assert msg == self.recv(mon)
129
130 def test_proxy_bind_to_random_with_args(self):
131 if zmq.zmq_version_info() < (3, 2):

Callers

nothing calls this directly

Calls 8

startMethod · 0.45
socketMethod · 0.45
connectMethod · 0.45
sendMethod · 0.45
recvMethod · 0.45

Tested by

no test coverage detected