(self)
| 106 | self.fail('Should have failed') |
| 107 | |
| 108 | def test_proxy(self): |
| 109 | if zmq.zmq_version_info() < (3, 2): |
| 110 | raise SkipTest("Proxies only in libzmq >= 3") |
| 111 | dev = devices.ThreadProxy(zmq.PULL, zmq.PUSH, zmq.PUSH) |
| 112 | iface = 'tcp://127.0.0.1' |
| 113 | port = dev.bind_in_to_random_port(iface) |
| 114 | port2 = dev.bind_out_to_random_port(iface) |
| 115 | port3 = dev.bind_mon_to_random_port(iface) |
| 116 | dev.start() |
| 117 | time.sleep(0.25) |
| 118 | msg = b'hello' |
| 119 | push = self.context.socket(zmq.PUSH) |
| 120 | push.connect(f"{iface}:{port}") |
| 121 | pull = self.context.socket(zmq.PULL) |
| 122 | pull.connect(f"{iface}:{port2}") |
| 123 | mon = self.context.socket(zmq.PULL) |
| 124 | mon.connect(f"{iface}:{port3}") |
| 125 | push.send(msg) |
| 126 | self.sockets.extend([push, pull, mon]) |
| 127 | assert msg == self.recv(pull) |
| 128 | assert msg == self.recv(mon) |
| 129 | |
| 130 | def test_proxy_bind_to_random_with_args(self): |
| 131 | if zmq.zmq_version_info() < (3, 2): |
nothing calls this directly
no test coverage detected