| 99 | |
| 100 | @staticmethod |
| 101 | def predictor_worker(recv_queue, send_queue, worker_alive): |
| 102 | predictor = None |
| 103 | predictor_args = () |
| 104 | timing = AccumDict() |
| 105 | log = Logger('./var/log/predictor_worker.log', verbose=opt.verbose) |
| 106 | |
| 107 | try: |
| 108 | while worker_alive.value: |
| 109 | tt = TicToc() |
| 110 | |
| 111 | try: |
| 112 | method, data = recv_queue.get(timeout=GET_TIMEOUT) |
| 113 | except queue.Empty: |
| 114 | continue |
| 115 | |
| 116 | # get the latest non-critical request from the queue |
| 117 | # don't skip critical request |
| 118 | while not recv_queue.empty() and not method['critical']: |
| 119 | log(f"skip {method}") |
| 120 | method, data = recv_queue.get() |
| 121 | |
| 122 | log("working on", method) |
| 123 | |
| 124 | try: |
| 125 | tt.tic() |
| 126 | if method['name'] == 'predict': |
| 127 | image = cv2.imdecode(np.frombuffer(data, dtype='uint8'), -1) |
| 128 | else: |
| 129 | args = msgpack.unpackb(data) |
| 130 | timing.add('UNPACK', tt.toc()) |
| 131 | except ValueError: |
| 132 | log("Invalid Message", important=True) |
| 133 | continue |
| 134 | |
| 135 | tt.tic() |
| 136 | if method['name'] == "hello": |
| 137 | result = "OK" |
| 138 | elif method['name'] == "__init__": |
| 139 | if args == predictor_args: |
| 140 | log("Same config as before... reusing previous predictor") |
| 141 | else: |
| 142 | del predictor |
| 143 | predictor_args = args |
| 144 | predictor = PredictorLocal(*predictor_args[0], **predictor_args[1]) |
| 145 | log("Initialized predictor with:", predictor_args, important=True) |
| 146 | result = True |
| 147 | tt.tic() # don't account for init |
| 148 | elif method['name'] == 'predict': |
| 149 | assert predictor is not None, "Predictor was not initialized" |
| 150 | result = getattr(predictor, method['name'])(image) |
| 151 | else: |
| 152 | assert predictor is not None, "Predictor was not initialized" |
| 153 | result = getattr(predictor, method['name'])(*args[0], **args[1]) |
| 154 | timing.add('CALL', tt.toc()) |
| 155 | |
| 156 | tt.tic() |
| 157 | if method['name'] == 'predict': |
| 158 | assert isinstance(result, np.ndarray), f'Expected np.ndarray, got {result.__class__}' |