(address: str)
| 171 | |
| 172 | |
| 173 | def serve(address: str) -> None: |
| 174 | interceptors = get_auth_interceptors() |
| 175 | server = grpc.server( |
| 176 | futures.ThreadPoolExecutor(max_workers=MAX_WORKERS), |
| 177 | interceptors=interceptors, |
| 178 | options=[ |
| 179 | ("grpc.max_send_message_length", 128 * 1024 * 1024), |
| 180 | ("grpc.max_receive_message_length", 128 * 1024 * 1024), |
| 181 | ], |
| 182 | ) |
| 183 | backend_pb2_grpc.add_BackendServicer_to_server(BackendServicer(), server) |
| 184 | server.add_insecure_port(address) |
| 185 | server.start() |
| 186 | print("speaker-recognition backend listening on", address, flush=True) |
| 187 | |
| 188 | def _stop(*_): |
| 189 | server.stop(0) |
| 190 | sys.exit(0) |
| 191 | |
| 192 | signal.signal(signal.SIGTERM, _stop) |
| 193 | signal.signal(signal.SIGINT, _stop) |
| 194 | try: |
| 195 | while True: |
| 196 | time.sleep(_ONE_DAY) |
| 197 | except KeyboardInterrupt: |
| 198 | server.stop(0) |
| 199 | |
| 200 | |
| 201 | if __name__ == "__main__": |
no test coverage detected