(self, num_procs, init_method)
| 217 | pytest.skip(skip_msgs[0]) |
| 218 | |
| 219 | def _launch_non_daemonic_procs(self, num_procs, init_method): |
| 220 | assert not self.reuse_dist_env, "Cannot reuse distributed environment with non-daemonic processes" |
| 221 | |
| 222 | master_port = get_master_port() |
| 223 | skip_msg = mp.Queue() # Allows forked processes to share pytest.skip reason |
| 224 | processes = [] |
| 225 | prev_start_method = mp.get_start_method() |
| 226 | mp.set_start_method('spawn', force=True) |
| 227 | for local_rank in range(num_procs): |
| 228 | p = mp.Process(target=self._dist_run, args=(local_rank, num_procs, master_port, init_method, skip_msg)) |
| 229 | p.start() |
| 230 | processes.append(p) |
| 231 | mp.set_start_method(prev_start_method, force=True) |
| 232 | |
| 233 | # Now loop and wait for a test to complete. The spin-wait here isn't a big |
| 234 | # deal because the number of processes will be O(#GPUs) << O(#CPUs). |
| 235 | any_done = False |
| 236 | start = time.time() |
| 237 | while (not any_done) and ((time.time() - start) < self.exec_timeout): |
| 238 | for p in processes: |
| 239 | if not p.is_alive(): |
| 240 | any_done = True |
| 241 | break |
| 242 | time.sleep(.1) # So we don't hog CPU |
| 243 | |
| 244 | # If we hit the timeout, then presume a test is hanged |
| 245 | if not any_done: |
| 246 | for p in processes: |
| 247 | p.terminate() |
| 248 | pytest.exit("Test hanged, exiting", returncode=1) |
| 249 | |
| 250 | # Wait for all other processes to complete |
| 251 | for p in processes: |
| 252 | p.join(self.exec_timeout) |
| 253 | |
| 254 | failed = [(rank, p) for rank, p in enumerate(processes) if p.exitcode != 0] |
| 255 | for rank, p in failed: |
| 256 | # If it still hasn't terminated, kill it because it hung. |
| 257 | if p.exitcode is None: |
| 258 | p.terminate() |
| 259 | pytest.fail(f'Worker {rank} hung.', pytrace=False) |
| 260 | if p.exitcode < 0: |
| 261 | pytest.fail(f'Worker {rank} killed by signal {-p.exitcode}', pytrace=False) |
| 262 | if p.exitcode > 0: |
| 263 | pytest.fail(f'Worker {rank} exited with code {p.exitcode}', pytrace=False) |
| 264 | |
| 265 | if not skip_msg.empty(): |
| 266 | # This assumed all skip messages are the same, it may be useful to |
| 267 | # add a check here to assert all exit messages are equal |
| 268 | pytest.skip(skip_msg.get()) |
| 269 | |
| 270 | def _launch_procs(self, num_procs, init_method): |
| 271 | # Verify we have enough accelerator devices to run this test |
no test coverage detected