Execute jobs for the given inputs in parallel subprocesses. :param int max_jobs: The maximum number of parallel jobs to spawn. :param inputs: An iterable of the data to parallelize over `spawn_func`. :param spawn_func: A function taking a single input and returning a :class:`SpawnedJob`
(
inputs, # type: Iterable[_I]
spawn_func, # type: Callable[[_I], SpawnedJob[_O]]
error_handler=None, # type: Optional[ErrorHandler[_I, _SE, _JE]]
max_jobs=None, # type: Optional[int]
)
| 529 | |
| 530 | |
| 531 | def execute_parallel( |
| 532 | inputs, # type: Iterable[_I] |
| 533 | spawn_func, # type: Callable[[_I], SpawnedJob[_O]] |
| 534 | error_handler=None, # type: Optional[ErrorHandler[_I, _SE, _JE]] |
| 535 | max_jobs=None, # type: Optional[int] |
| 536 | ): |
| 537 | # type: (...) -> Iterator[Union[_O, _SE, _JE]] |
| 538 | """Execute jobs for the given inputs in parallel subprocesses. |
| 539 | |
| 540 | :param int max_jobs: The maximum number of parallel jobs to spawn. |
| 541 | :param inputs: An iterable of the data to parallelize over `spawn_func`. |
| 542 | :param spawn_func: A function taking a single input and returning a :class:`SpawnedJob`. |
| 543 | :param error_handler: An optional :class:`ErrorHandler`, defaults to :class:`Log`. |
| 544 | :returns: An iterator over the spawned job results as they come in. |
| 545 | :raises: A `raise_type` exception if any individual job errors and `raise_type` is not `None`. |
| 546 | """ |
| 547 | handler = ( |
| 548 | error_handler or Log["_I", "_O"]() |
| 549 | ) # type: Union[ErrorHandler[_I, _SE, _JE], Log[_I, _O]] |
| 550 | size = _sanitize_max_jobs(max_jobs) |
| 551 | TRACER.log( |
| 552 | "Spawning a maximum of {} parallel jobs to process:\n {}".format( |
| 553 | size, "\n ".join(map(str, inputs)) |
| 554 | ), |
| 555 | V=9, |
| 556 | ) |
| 557 | |
| 558 | @attr.s(frozen=True) |
| 559 | class Spawn(object): |
| 560 | item = attr.ib() # type: Any |
| 561 | spawned_job = attr.ib() # type: SpawnedJob |
| 562 | |
| 563 | @attr.s(frozen=True) |
| 564 | class SpawnError(object): |
| 565 | item = attr.ib() # type: Any |
| 566 | error = attr.ib() # type: Exception |
| 567 | |
| 568 | stop = Event() # Used as a signal to stop spawning further jobs once any one job fails. |
| 569 | job_slots = BoundedSemaphore(value=size) |
| 570 | |
| 571 | class DoneSentinel(object): |
| 572 | pass |
| 573 | |
| 574 | done_sentinel = DoneSentinel() |
| 575 | spawn_queue = Queue() # type: Queue[Union[Spawn, SpawnError, DoneSentinel]] |
| 576 | |
| 577 | def spawn_jobs(): |
| 578 | for item in inputs: |
| 579 | if stop.is_set(): |
| 580 | break |
| 581 | job_slots.acquire() |
| 582 | try: |
| 583 | result = Spawn(item, spawn_func(item)) |
| 584 | except Exception as e: |
| 585 | result = SpawnError(item, e) |
| 586 | finally: |
| 587 | spawn_queue.put(result) |
| 588 | spawn_queue.put(done_sentinel) |
no test coverage detected