MCPcopy Index your code
hub / github.com/pathwaycom/pathway / wait_result_with_checker

Function wait_result_with_checker

python/pathway/tests/utils.py:688–778  ·  view source on GitHub ↗
(
    checker: Callable[[], bool],
    timeout_sec: float,
    *,
    step: float = 0.1,
    double_check_interval: float | None = None,
    target: Callable[..., None] | None = run,
    processes: int = 1,
    first_port: int | None = None,
    args: Iterable[Any] = (),
    kwargs: Mapping[str, Any] = {},
)

Source from the content-addressed store, hash-verified

686
687
688def wait_result_with_checker(
689 checker: Callable[[], bool],
690 timeout_sec: float,
691 *,
692 step: float = 0.1,
693 double_check_interval: float | None = None,
694 target: Callable[..., None] | None = run,
695 processes: int = 1,
696 first_port: int | None = None,
697 args: Iterable[Any] = (),
698 kwargs: Mapping[str, Any] = {},
699) -> None:
700 handles: list[multiprocessing.Process] = []
701 try:
702 if target is not None:
703 assert (
704 multiprocessing.get_start_method() == "fork"
705 ), "multiprocessing does not use fork(), pw.run() will not work"
706
707 if processes != 1:
708 assert first_port is not None
709 run_id = uuid.uuid4()
710
711 def target_wrapped(process_id, *args, **kwargs):
712 os.environ["PATHWAY_PROCESSES"] = str(processes)
713 os.environ["PATHWAY_FIRST_PORT"] = str(first_port)
714 os.environ["PATHWAY_PROCESS_ID"] = str(process_id)
715 os.environ["PATHWAY_RUN_ID"] = str(run_id)
716 target(*args, **kwargs)
717
718 for process_id in range(processes):
719 p = multiprocessing.Process(
720 target=target_wrapped, args=(process_id, *args), kwargs=kwargs
721 )
722 p.start()
723 handles.append(p)
724 else:
725 target_wrapped = target
726 p = multiprocessing.Process(target=target, args=args, kwargs=kwargs)
727 p.start()
728 handles.append(p)
729
730 succeeded = False
731 start_time = time.monotonic()
732 while True:
733 time.sleep(step)
734
735 elapsed = time.monotonic() - start_time
736 if elapsed >= timeout_sec:
737 print("Timed out", file=sys.stderr)
738 break
739
740 succeeded = checker()
741 if succeeded:
742 print(
743 f"Correct result obtained after {elapsed:.1f} seconds",
744 file=sys.stderr,
745 )

Calls 4

anyFunction · 0.85
checkerFunction · 0.70
startMethod · 0.45
joinMethod · 0.45