MCPcopy
hub / github.com/pex-tool/pex / execute_parallel

Function execute_parallel

pex/jobs.py:531–655  ·  view source on GitHub ↗

Execute jobs for the given inputs in parallel subprocesses. :param int max_jobs: The maximum number of parallel jobs to spawn. :param inputs: An iterable of the data to parallelize over `spawn_func`. :param spawn_func: A function taking a single input and returning a :class:`SpawnedJob`

(
    inputs,  # type: Iterable[_I]
    spawn_func,  # type: Callable[[_I], SpawnedJob[_O]]
    error_handler=None,  # type: Optional[ErrorHandler[_I, _SE, _JE]]
    max_jobs=None,  # type: Optional[int]
)

Source from the content-addressed store, hash-verified

529
530
531def execute_parallel(
532 inputs, # type: Iterable[_I]
533 spawn_func, # type: Callable[[_I], SpawnedJob[_O]]
534 error_handler=None, # type: Optional[ErrorHandler[_I, _SE, _JE]]
535 max_jobs=None, # type: Optional[int]
536):
537 # type: (...) -> Iterator[Union[_O, _SE, _JE]]
538 """Execute jobs for the given inputs in parallel subprocesses.
539
540 :param int max_jobs: The maximum number of parallel jobs to spawn.
541 :param inputs: An iterable of the data to parallelize over `spawn_func`.
542 :param spawn_func: A function taking a single input and returning a :class:`SpawnedJob`.
543 :param error_handler: An optional :class:`ErrorHandler`, defaults to :class:`Log`.
544 :returns: An iterator over the spawned job results as they come in.
545 :raises: A `raise_type` exception if any individual job errors and `raise_type` is not `None`.
546 """
547 handler = (
548 error_handler or Log["_I", "_O"]()
549 ) # type: Union[ErrorHandler[_I, _SE, _JE], Log[_I, _O]]
550 size = _sanitize_max_jobs(max_jobs)
551 TRACER.log(
552 "Spawning a maximum of {} parallel jobs to process:\n {}".format(
553 size, "\n ".join(map(str, inputs))
554 ),
555 V=9,
556 )
557
558 @attr.s(frozen=True)
559 class Spawn(object):
560 item = attr.ib() # type: Any
561 spawned_job = attr.ib() # type: SpawnedJob
562
563 @attr.s(frozen=True)
564 class SpawnError(object):
565 item = attr.ib() # type: Any
566 error = attr.ib() # type: Exception
567
568 stop = Event() # Used as a signal to stop spawning further jobs once any one job fails.
569 job_slots = BoundedSemaphore(value=size)
570
571 class DoneSentinel(object):
572 pass
573
574 done_sentinel = DoneSentinel()
575 spawn_queue = Queue() # type: Queue[Union[Spawn, SpawnError, DoneSentinel]]
576
577 def spawn_jobs():
578 for item in inputs:
579 if stop.is_set():
580 break
581 job_slots.acquire()
582 try:
583 result = Spawn(item, spawn_func(item))
584 except Exception as e:
585 result = SpawnError(item, e)
586 finally:
587 spawn_queue.put(result)
588 spawn_queue.put(done_sentinel)

Callers 9

build_pex_sciesFunction · 0.90
generate_reportsMethod · 0.90
build_wheelsMethod · 0.90
prune_pip_cachesMethod · 0.90
collect_requirementsMethod · 0.90
fingerprintMethod · 0.90
lockMethod · 0.90

Calls 11

_sanitize_max_jobsFunction · 0.85
DoneSentinelClass · 0.85
spawned_jobsFunction · 0.85
logMethod · 0.80
joinMethod · 0.45
getMethod · 0.45
handle_spawn_errorMethod · 0.45
killMethod · 0.45
await_resultMethod · 0.45
handle_job_errorMethod · 0.45
releaseMethod · 0.45

Tested by

no test coverage detected