| 126 | |
| 127 | @staticmethod |
| 128 | def send_worker(address, send_queue, worker_alive): |
| 129 | timing = AccumDict() |
| 130 | log = Logger('./var/log/send_worker.log', opt.verbose) |
| 131 | |
| 132 | ctx = SerializingContext() |
| 133 | sender = ctx.socket(zmq.PUSH) |
| 134 | sender.connect(address) |
| 135 | |
| 136 | log(f"Sending to {address}") |
| 137 | |
| 138 | try: |
| 139 | while worker_alive.value: |
| 140 | tt = TicToc() |
| 141 | |
| 142 | try: |
| 143 | msg = send_queue.get(timeout=GET_TIMEOUT) |
| 144 | except queue.Empty: |
| 145 | continue |
| 146 | |
| 147 | tt.tic() |
| 148 | sender.send_data(*msg) |
| 149 | timing.add('SEND', tt.toc()) |
| 150 | |
| 151 | if opt.verbose: |
| 152 | Once(timing, log, per=1) |
| 153 | except KeyboardInterrupt: |
| 154 | log("send_worker: user interrupt") |
| 155 | finally: |
| 156 | worker_alive.value = 0 |
| 157 | |
| 158 | sender.disconnect(address) |
| 159 | sender.close() |
| 160 | ctx.destroy() |
| 161 | log("send_worker exit") |
| 162 | |
| 163 | @staticmethod |
| 164 | def recv_worker(address, recv_queue, worker_alive): |