Run extractor on files using a rolling thread pool. Keeps at most ``jobs`` tasks submitted at a time so a late fatal error does not leave the entire remaining corpus pre-queued. ``on_start`` runs in worker threads. ``on_result`` runs in the main thread. Callback exceptions are trea
(
extractor: Extractor,
gz_files: list[str],
jobs: int,
on_start: Callable[[str], None] | None = None,
on_result: Callable[[str, ExtractionResult], None] | None = None,
)
| 185 | |
| 186 | |
| 187 | def run_parallel( |
| 188 | extractor: Extractor, |
| 189 | gz_files: list[str], |
| 190 | jobs: int, |
| 191 | on_start: Callable[[str], None] | None = None, |
| 192 | on_result: Callable[[str, ExtractionResult], None] | None = None, |
| 193 | ) -> BatchResult: |
| 194 | """Run extractor on files using a rolling thread pool. |
| 195 | |
| 196 | Keeps at most ``jobs`` tasks submitted at a time so a late fatal error |
| 197 | does not leave the entire remaining corpus pre-queued. |
| 198 | |
| 199 | ``on_start`` runs in worker threads. ``on_result`` runs in the main thread. |
| 200 | Callback exceptions are treated as fatal. |
| 201 | """ |
| 202 | batch = BatchResult() |
| 203 | |
| 204 | def _do_one(gz_path: str) -> ExtractionResult: |
| 205 | if on_start: |
| 206 | on_start(gz_path) |
| 207 | return _extract_one(extractor, gz_path) |
| 208 | |
| 209 | executor = concurrent.futures.ThreadPoolExecutor(max_workers=jobs) |
| 210 | try: |
| 211 | pending: set[concurrent.futures.Future[ExtractionResult]] = set() |
| 212 | gz_iter = iter(gz_files) |
| 213 | |
| 214 | def _submit_next() -> bool: |
| 215 | gz_path = next(gz_iter, None) |
| 216 | if gz_path is None: |
| 217 | return False |
| 218 | pending.add(executor.submit(_do_one, gz_path)) |
| 219 | return True |
| 220 | |
| 221 | for _ in range(jobs): |
| 222 | if not _submit_next(): |
| 223 | break |
| 224 | |
| 225 | while pending: |
| 226 | done, _ = concurrent.futures.wait( |
| 227 | pending, return_when=concurrent.futures.FIRST_COMPLETED |
| 228 | ) |
| 229 | for future in done: |
| 230 | pending.remove(future) |
| 231 | entry = future.result() # raises FatalExtractionError on fatal |
| 232 | _tally(batch, entry) |
| 233 | if on_result: |
| 234 | on_result(entry.gz_path, entry) |
| 235 | _submit_next() |
| 236 | except KeyboardInterrupt: |
| 237 | n_pending = sum(1 for f in pending if f.running()) |
| 238 | logger.info( |
| 239 | "interrupted by user, waiting for %d pending request(s) to finish", |
| 240 | n_pending, |
| 241 | ) |
| 242 | batch.interrupted = True |
| 243 | extractor.cancel() |
| 244 | executor.shutdown(wait=False, cancel_futures=True) |
no test coverage detected