| 105 | |
| 106 | |
| 107 | def create_process_handles( |
| 108 | *, |
| 109 | processes: int, |
| 110 | threads: int, |
| 111 | first_port: int, |
| 112 | addresses: str | None = None, |
| 113 | process_id: int | None = None, |
| 114 | run_id: str, |
| 115 | env_base: dict[str, str], |
| 116 | program: str, |
| 117 | arguments: Iterable[str], |
| 118 | ) -> list[subprocess.Popen]: |
| 119 | processes_str = plural(processes, "process", "processes") |
| 120 | workers_str = plural(processes * threads, "total worker", "total workers") |
| 121 | click.echo(f"Preparing {processes_str} ({workers_str})", err=True) |
| 122 | |
| 123 | env_common = env_base.copy() |
| 124 | env_common["PATHWAY_THREADS"] = str(threads) |
| 125 | env_common["PATHWAY_PROCESSES"] = str(processes) |
| 126 | env_common["PATHWAY_RUN_ID"] = str(run_id) |
| 127 | env_common["PATHWAY_SUPPRESS_OTHER_WORKER_ERRORS"] = "1" |
| 128 | if addresses is not None: |
| 129 | _check_entitlements("multiple-machines") |
| 130 | env_common["PATHWAY_ADDRESSES"] = addresses |
| 131 | else: |
| 132 | env_common["PATHWAY_FIRST_PORT"] = str(first_port) |
| 133 | |
| 134 | process_ids = [process_id] if addresses is not None else range(processes) |
| 135 | process_handles = [] |
| 136 | for pid in process_ids: |
| 137 | env = env_common.copy() |
| 138 | env["PATHWAY_PROCESS_ID"] = str(pid) |
| 139 | handle = subprocess.Popen([program] + list(arguments), env=env) |
| 140 | process_handles.append(handle) |
| 141 | |
| 142 | return process_handles |
| 143 | |
| 144 | |
| 145 | def wait_for_process_handles( |