| 686 | |
| 687 | |
| 688 | def wait_result_with_checker( |
| 689 | checker: Callable[[], bool], |
| 690 | timeout_sec: float, |
| 691 | *, |
| 692 | step: float = 0.1, |
| 693 | double_check_interval: float | None = None, |
| 694 | target: Callable[..., None] | None = run, |
| 695 | processes: int = 1, |
| 696 | first_port: int | None = None, |
| 697 | args: Iterable[Any] = (), |
| 698 | kwargs: Mapping[str, Any] = {}, |
| 699 | ) -> None: |
| 700 | handles: list[multiprocessing.Process] = [] |
| 701 | try: |
| 702 | if target is not None: |
| 703 | assert ( |
| 704 | multiprocessing.get_start_method() == "fork" |
| 705 | ), "multiprocessing does not use fork(), pw.run() will not work" |
| 706 | |
| 707 | if processes != 1: |
| 708 | assert first_port is not None |
| 709 | run_id = uuid.uuid4() |
| 710 | |
| 711 | def target_wrapped(process_id, *args, **kwargs): |
| 712 | os.environ["PATHWAY_PROCESSES"] = str(processes) |
| 713 | os.environ["PATHWAY_FIRST_PORT"] = str(first_port) |
| 714 | os.environ["PATHWAY_PROCESS_ID"] = str(process_id) |
| 715 | os.environ["PATHWAY_RUN_ID"] = str(run_id) |
| 716 | target(*args, **kwargs) |
| 717 | |
| 718 | for process_id in range(processes): |
| 719 | p = multiprocessing.Process( |
| 720 | target=target_wrapped, args=(process_id, *args), kwargs=kwargs |
| 721 | ) |
| 722 | p.start() |
| 723 | handles.append(p) |
| 724 | else: |
| 725 | target_wrapped = target |
| 726 | p = multiprocessing.Process(target=target, args=args, kwargs=kwargs) |
| 727 | p.start() |
| 728 | handles.append(p) |
| 729 | |
| 730 | succeeded = False |
| 731 | start_time = time.monotonic() |
| 732 | while True: |
| 733 | time.sleep(step) |
| 734 | |
| 735 | elapsed = time.monotonic() - start_time |
| 736 | if elapsed >= timeout_sec: |
| 737 | print("Timed out", file=sys.stderr) |
| 738 | break |
| 739 | |
| 740 | succeeded = checker() |
| 741 | if succeeded: |
| 742 | print( |
| 743 | f"Correct result obtained after {elapsed:.1f} seconds", |
| 744 | file=sys.stderr, |
| 745 | ) |