MCPcopy
hub / github.com/deepspeedai/DeepSpeed / _launch_non_daemonic_procs

Method _launch_non_daemonic_procs

tests/unit/common.py:219–268  ·  view source on GitHub ↗
(self, num_procs, init_method)

Source from the content-addressed store, hash-verified

217 pytest.skip(skip_msgs[0])
218
219 def _launch_non_daemonic_procs(self, num_procs, init_method):
220 assert not self.reuse_dist_env, "Cannot reuse distributed environment with non-daemonic processes"
221
222 master_port = get_master_port()
223 skip_msg = mp.Queue() # Allows forked processes to share pytest.skip reason
224 processes = []
225 prev_start_method = mp.get_start_method()
226 mp.set_start_method('spawn', force=True)
227 for local_rank in range(num_procs):
228 p = mp.Process(target=self._dist_run, args=(local_rank, num_procs, master_port, init_method, skip_msg))
229 p.start()
230 processes.append(p)
231 mp.set_start_method(prev_start_method, force=True)
232
233 # Now loop and wait for a test to complete. The spin-wait here isn't a big
234 # deal because the number of processes will be O(#GPUs) << O(#CPUs).
235 any_done = False
236 start = time.time()
237 while (not any_done) and ((time.time() - start) < self.exec_timeout):
238 for p in processes:
239 if not p.is_alive():
240 any_done = True
241 break
242 time.sleep(.1) # So we don't hog CPU
243
244 # If we hit the timeout, then presume a test is hanged
245 if not any_done:
246 for p in processes:
247 p.terminate()
248 pytest.exit("Test hanged, exiting", returncode=1)
249
250 # Wait for all other processes to complete
251 for p in processes:
252 p.join(self.exec_timeout)
253
254 failed = [(rank, p) for rank, p in enumerate(processes) if p.exitcode != 0]
255 for rank, p in failed:
256 # If it still hasn't terminated, kill it because it hung.
257 if p.exitcode is None:
258 p.terminate()
259 pytest.fail(f'Worker {rank} hung.', pytrace=False)
260 if p.exitcode < 0:
261 pytest.fail(f'Worker {rank} killed by signal {-p.exitcode}', pytrace=False)
262 if p.exitcode > 0:
263 pytest.fail(f'Worker {rank} exited with code {p.exitcode}', pytrace=False)
264
265 if not skip_msg.empty():
266 # This assumed all skip messages are the same, it may be useful to
267 # add a check here to assert all exit messages are equal
268 pytest.skip(skip_msg.get())
269
270 def _launch_procs(self, num_procs, init_method):
271 # Verify we have enough accelerator devices to run this test

Callers 1

_launch_procsMethod · 0.95

Calls 4

get_master_portFunction · 0.85
appendMethod · 0.80
startMethod · 0.45
getMethod · 0.45

Tested by

no test coverage detected