Main worker function
()
| 55 | |
| 56 | |
| 57 | def main(): |
| 58 | """Main worker function""" |
| 59 | if len(sys.argv) != 3: |
| 60 | print("Usage: <read_fd> <write_fd>") |
| 61 | return |
| 62 | if sys.platform == "win32": |
| 63 | # pylint: disable=import-outside-toplevel |
| 64 | import msvcrt |
| 65 | |
| 66 | reader = os.fdopen(msvcrt.open_osfhandle(int(sys.argv[1]), os.O_BINARY), "rb") |
| 67 | writer = os.fdopen(msvcrt.open_osfhandle(int(sys.argv[2]), os.O_BINARY), "wb") |
| 68 | else: |
| 69 | reader = os.fdopen(int(sys.argv[1]), "rb") |
| 70 | writer = os.fdopen(int(sys.argv[2]), "wb") |
| 71 | |
| 72 | logging.basicConfig(level=logging.INFO) |
| 73 | |
| 74 | lock = threading.Lock() |
| 75 | |
| 76 | def _respond(ret_value): |
| 77 | """Send data back to the client.""" |
| 78 | data = cloudpickle.dumps(ret_value, protocol=pickle.HIGHEST_PROTOCOL) |
| 79 | writer.write(struct.pack("<i", len(data))) |
| 80 | writer.write(data) |
| 81 | writer.flush() |
| 82 | |
| 83 | def _cancel_run(status): |
| 84 | lock.acquire() |
| 85 | if status.status == StatusKind.RUNNING: |
| 86 | _respond((StatusKind.TIMEOUT, TimeoutError())) |
| 87 | status.status = StatusKind.TIMEOUT |
| 88 | lock.release() |
| 89 | |
| 90 | while True: |
| 91 | raw_bytes_size = reader.read(4) |
| 92 | if len(raw_bytes_size) != 4: |
| 93 | # the parent exited |
| 94 | return |
| 95 | bytes_size = struct.unpack("<i", raw_bytes_size)[0] |
| 96 | fn, args, kwargs, timeout = cloudpickle.loads(reader.read(bytes_size)) |
| 97 | status = TimeoutStatus() |
| 98 | |
| 99 | if timeout is not None: |
| 100 | watcher = threading.Timer(timeout, _cancel_run, [status]) |
| 101 | watcher.daemon = True |
| 102 | watcher.start() |
| 103 | |
| 104 | # pylint: disable=broad-except |
| 105 | try: |
| 106 | result = fn(*args, **kwargs) |
| 107 | ret_value = (StatusKind.COMPLETE, result) |
| 108 | except Exception as exception: |
| 109 | msg = traceback.format_exc() |
| 110 | ret_value = (StatusKind.EXCEPTION, type(exception)(msg)) |
| 111 | |
| 112 | if timeout is not None: |
| 113 | watcher.cancel() |
| 114 |
no test coverage detected
searching dependent graphs…