MCPcopy
hub / github.com/idank/explainshell / run_parallel

Function run_parallel

explainshell/extraction/runner.py:187–252  ·  view source on GitHub ↗

Run extractor on files using a rolling thread pool. Keeps at most ``jobs`` tasks submitted at a time so a late fatal error does not leave the entire remaining corpus pre-queued. ``on_start`` runs in worker threads. ``on_result`` runs in the main thread. Callback exceptions are trea

(
    extractor: Extractor,
    gz_files: list[str],
    jobs: int,
    on_start: Callable[[str], None] | None = None,
    on_result: Callable[[str, ExtractionResult], None] | None = None,
)

Source from the content-addressed store, hash-verified

185
186
187def run_parallel(
188 extractor: Extractor,
189 gz_files: list[str],
190 jobs: int,
191 on_start: Callable[[str], None] | None = None,
192 on_result: Callable[[str, ExtractionResult], None] | None = None,
193) -> BatchResult:
194 """Run extractor on files using a rolling thread pool.
195
196 Keeps at most ``jobs`` tasks submitted at a time so a late fatal error
197 does not leave the entire remaining corpus pre-queued.
198
199 ``on_start`` runs in worker threads. ``on_result`` runs in the main thread.
200 Callback exceptions are treated as fatal.
201 """
202 batch = BatchResult()
203
204 def _do_one(gz_path: str) -> ExtractionResult:
205 if on_start:
206 on_start(gz_path)
207 return _extract_one(extractor, gz_path)
208
209 executor = concurrent.futures.ThreadPoolExecutor(max_workers=jobs)
210 try:
211 pending: set[concurrent.futures.Future[ExtractionResult]] = set()
212 gz_iter = iter(gz_files)
213
214 def _submit_next() -> bool:
215 gz_path = next(gz_iter, None)
216 if gz_path is None:
217 return False
218 pending.add(executor.submit(_do_one, gz_path))
219 return True
220
221 for _ in range(jobs):
222 if not _submit_next():
223 break
224
225 while pending:
226 done, _ = concurrent.futures.wait(
227 pending, return_when=concurrent.futures.FIRST_COMPLETED
228 )
229 for future in done:
230 pending.remove(future)
231 entry = future.result() # raises FatalExtractionError on fatal
232 _tally(batch, entry)
233 if on_result:
234 on_result(entry.gz_path, entry)
235 _submit_next()
236 except KeyboardInterrupt:
237 n_pending = sum(1 for f in pending if f.running())
238 logger.info(
239 "interrupted by user, waiting for %d pending request(s) to finish",
240 n_pending,
241 )
242 batch.interrupted = True
243 extractor.cancel()
244 executor.shutdown(wait=False, cancel_futures=True)

Callers 1

runFunction · 0.85

Calls 7

BatchResultClass · 0.90
rangeFunction · 0.85
_submit_nextFunction · 0.85
_tallyFunction · 0.85
shutdownMethod · 0.80
on_resultFunction · 0.50
cancelMethod · 0.45

Tested by

no test coverage detected