MCPcopy
hub / github.com/sphinx-doc/sphinx / ParallelTasks

Class ParallelTasks

sphinx/util/parallel.py:54–153  ·  view source on GitHub ↗

Executes *nproc* tasks in parallel after forking.

Source from the content-addressed store, hash-verified

52
53
54class ParallelTasks:
55 """Executes *nproc* tasks in parallel after forking."""
56
57 def __init__(self, nproc: int) -> None:
58 self.nproc = nproc
59 # (optional) function performed by each task on the result of main task
60 self._result_funcs: dict[int, Callable[[Any, Any], Any]] = {}
61 # task arguments
62 self._args: dict[int, list[Any] | None] = {}
63 # list of subprocesses (both started and waiting)
64 self._procs: dict[int, Any] = {}
65 # list of receiving pipe connections of running subprocesses
66 self._precvs: dict[int, Any] = {}
67 # list of receiving pipe connections of waiting subprocesses
68 self._precvs_waiting: dict[int, Any] = {}
69 # number of working subprocesses
70 self._pworking = 0
71 # task number of each subprocess
72 self._taskid = 0
73
74 def _process(
75 self, pipe: Any, func: Callable[[Any], Any] | Callable[[], Any], arg: Any
76 ) -> None:
77 try:
78 collector = logging.LogCollector()
79 with collector.collect():
80 if arg is None:
81 ret = func() # type: ignore[call-arg]
82 else:
83 ret = func(arg) # type: ignore[call-arg]
84 failed = False
85 except BaseException as err:
86 failed = True
87 errmsg = traceback.format_exception_only(err.__class__, err)[0].strip()
88 ret = (errmsg, traceback.format_exc())
89 logging.convert_serializable(collector.logs)
90 pipe.send((failed, collector.logs, ret))
91
92 def add_task(
93 self,
94 task_func: Callable[[Any], Any] | Callable[[], Any],
95 arg: Any = None,
96 result_func: Callable[[Any, Any], Any] | None = None,
97 ) -> None:
98 tid = self._taskid
99 self._taskid += 1
100 self._result_funcs[tid] = result_func or (lambda arg, result: None)
101 self._args[tid] = arg
102 precv, psend = multiprocessing.Pipe(False)
103 context: Any = multiprocessing.get_context('fork')
104 proc = context.Process(target=self._process, args=(psend, task_func, arg)) # ty: ignore[unresolved-attribute]
105 self._procs[tid] = proc
106 self._precvs_waiting[tid] = precv
107 try:
108 self._join_one()
109 except Exception:
110 # shutdown other child processes on failure
111 # (e.g. OSError: Failed to allocate memory)

Callers 3

_read_parallelMethod · 0.90
_write_parallelMethod · 0.90

Calls

no outgoing calls

Tested by 1

Used in the wild real call sites across dependent graphs

searching dependent graphs…