(self, num_procs, init_method)
| 268 | pytest.skip(skip_msg.get()) |
| 269 | |
| 270 | def _launch_procs(self, num_procs, init_method): |
| 271 | # Verify we have enough accelerator devices to run this test |
| 272 | if get_accelerator().is_available() and get_accelerator().device_count() < num_procs: |
| 273 | pytest.skip( |
| 274 | f"Skipping test because not enough GPUs are available: {num_procs} required, {get_accelerator().device_count()} available" |
| 275 | ) |
| 276 | |
| 277 | if get_accelerator().device_name() == 'xpu': |
| 278 | self.non_daemonic_procs = True |
| 279 | self.reuse_dist_env = False |
| 280 | |
| 281 | # Allow disabling reuse_dist_env via environment variable. |
| 282 | # This is useful for CI full test runs where reusing distributed environment |
| 283 | # can cause pool worker cleanup to hang after tests complete. |
| 284 | if os.environ.get('DS_DISABLE_REUSE_DIST_ENV', '0') == '1': |
| 285 | self.reuse_dist_env = False |
| 286 | |
| 287 | # Set start method to `forkserver` (or `fork`) |
| 288 | mp.set_start_method('forkserver', force=True) |
| 289 | |
| 290 | if self.non_daemonic_procs: |
| 291 | self._launch_non_daemonic_procs(num_procs, init_method) |
| 292 | else: |
| 293 | self._launch_daemonic_procs(num_procs, init_method) |
| 294 | |
| 295 | def _dist_run(self, local_rank, num_procs, master_port, init_method, skip_msg=""): |
| 296 | if dist.is_initialized(): |
no test coverage detected