(self, num_procs, init_method)
| 178 | return fixture_kwargs |
| 179 | |
| 180 | def _launch_daemonic_procs(self, num_procs, init_method): |
| 181 | # Create process pool or use cached one |
| 182 | master_port = None |
| 183 | |
| 184 | if get_accelerator().device_name() == 'hpu': |
| 185 | if self.reuse_dist_env: |
| 186 | print("Ignoring reuse_dist_env for hpu") |
| 187 | self.reuse_dist_env = False |
| 188 | |
| 189 | if self.reuse_dist_env: |
| 190 | if num_procs not in self._pool_cache: |
| 191 | self._pool_cache[num_procs] = mp.Pool(processes=num_procs) |
| 192 | master_port = get_master_port() |
| 193 | pool = self._pool_cache[num_procs] |
| 194 | else: |
| 195 | pool = mp.Pool(processes=num_procs) |
| 196 | master_port = get_master_port() |
| 197 | |
| 198 | # Run the test |
| 199 | args = [(local_rank, num_procs, master_port, init_method) for local_rank in range(num_procs)] |
| 200 | skip_msgs_async = pool.starmap_async(self._dist_run, args) |
| 201 | |
| 202 | try: |
| 203 | skip_msgs = skip_msgs_async.get(self.exec_timeout) |
| 204 | except mp.TimeoutError: |
| 205 | # Shortcut to exit pytest in the case of a hanged test. This |
| 206 | # usually means an environment error and the rest of tests will |
| 207 | # hang (causing super long unit test runtimes) |
| 208 | pytest.exit("Test hanged, exiting", returncode=1) |
| 209 | finally: |
| 210 | # Regardless of the outcome, ensure proper teardown |
| 211 | # Tear down distributed environment and close process pools |
| 212 | self._close_pool(pool, num_procs) |
| 213 | |
| 214 | # If we skipped a test, propagate that to this process |
| 215 | if any(skip_msgs): |
| 216 | assert len(set(skip_msgs)) == 1, "Multiple different skip messages received" |
| 217 | pytest.skip(skip_msgs[0]) |
| 218 | |
| 219 | def _launch_non_daemonic_procs(self, num_procs, init_method): |
| 220 | assert not self.reuse_dist_env, "Cannot reuse distributed environment with non-daemonic processes" |
no test coverage detected