| 57 | |
| 58 | @staticmethod |
| 59 | def recv_worker(port, recv_queue, worker_alive): |
| 60 | timing = AccumDict() |
| 61 | log = Logger('./var/log/recv_worker.log', verbose=opt.verbose) |
| 62 | |
| 63 | ctx = SerializingContext() |
| 64 | socket = ctx.socket(zmq.PULL) |
| 65 | socket.bind(f"tcp://*:{port}") |
| 66 | socket.RCVTIMEO = RECV_TIMEOUT |
| 67 | |
| 68 | log(f'Receiving on port {port}', important=True) |
| 69 | |
| 70 | try: |
| 71 | while worker_alive.value: |
| 72 | tt = TicToc() |
| 73 | |
| 74 | try: |
| 75 | tt.tic() |
| 76 | msg = socket.recv_data() |
| 77 | timing.add('RECV', tt.toc()) |
| 78 | except zmq.error.Again: |
| 79 | log("recv timeout") |
| 80 | continue |
| 81 | |
| 82 | #log('recv', msg[0]) |
| 83 | |
| 84 | method, data = msg |
| 85 | if method['critical']: |
| 86 | recv_queue.put(msg) |
| 87 | else: |
| 88 | try: |
| 89 | recv_queue.put(msg, block=False) |
| 90 | except queue.Full: |
| 91 | log('recv_queue full') |
| 92 | |
| 93 | Once(timing, log, per=1) |
| 94 | except KeyboardInterrupt: |
| 95 | log("recv_worker: user interrupt", important=True) |
| 96 | |
| 97 | worker_alive.value = 0 |
| 98 | log("recv_worker exit", important=True) |
| 99 | |
| 100 | @staticmethod |
| 101 | def predictor_worker(recv_queue, send_queue, worker_alive): |