| 90 | pipe.send((failed, collector.logs, ret)) |
| 91 | |
| 92 | def add_task( |
| 93 | self, |
| 94 | task_func: Callable[[Any], Any] | Callable[[], Any], |
| 95 | arg: Any = None, |
| 96 | result_func: Callable[[Any, Any], Any] | None = None, |
| 97 | ) -> None: |
| 98 | tid = self._taskid |
| 99 | self._taskid += 1 |
| 100 | self._result_funcs[tid] = result_func or (lambda arg, result: None) |
| 101 | self._args[tid] = arg |
| 102 | precv, psend = multiprocessing.Pipe(False) |
| 103 | context: Any = multiprocessing.get_context('fork') |
| 104 | proc = context.Process(target=self._process, args=(psend, task_func, arg)) # ty: ignore[unresolved-attribute] |
| 105 | self._procs[tid] = proc |
| 106 | self._precvs_waiting[tid] = precv |
| 107 | try: |
| 108 | self._join_one() |
| 109 | except Exception: |
| 110 | # shutdown other child processes on failure |
| 111 | # (e.g. OSError: Failed to allocate memory) |
| 112 | self.terminate() |
| 113 | |
| 114 | def join(self) -> None: |
| 115 | try: |