MCPcopy
hub / github.com/dmlc/dgl / submit_jobs

Function submit_jobs

tools/distgraphlaunch.py:385–442  ·  view source on GitHub ↗

Submit distributed jobs (server and client processes) via ssh

(args, udf_command)

Source from the content-addressed store, hash-verified

383
384
385def submit_jobs(args, udf_command):
386 """Submit distributed jobs (server and client processes) via ssh"""
387 hosts = []
388 thread_list = []
389 server_count_per_machine = 0
390
391 # Get the IP addresses of the cluster.
392 # ip_config = os.path.join(args.workspace, args.ip_config)
393 ip_config = args.ip_config
394 with open(ip_config) as f:
395 for line in f:
396 result = line.strip().split()
397 if len(result) == 2:
398 ip = result[0]
399 port = int(result[1])
400 hosts.append((ip, port))
401 elif len(result) == 1:
402 ip = result[0]
403 port = DEFAULT_PORT
404 hosts.append((ip, port))
405 else:
406 raise RuntimeError("Format error of ip_config.")
407 server_count_per_machine = args.num_proc_per_machine
408
409 # launch server tasks
410 server_env_vars = construct_dgl_server_env_vars(
411 ip_config=args.ip_config,
412 num_proc_per_machine=args.num_proc_per_machine,
413 pythonpath=os.environ.get("PYTHONPATH", ""),
414 )
415 for i in range(len(hosts) * server_count_per_machine):
416 ip, _ = hosts[int(i / server_count_per_machine)]
417 server_env_vars_cur = f"{server_env_vars} RANK={i} MASTER_ADDR={hosts[0][0]} MASTER_PORT={args.master_port}"
418 cmd = wrap_cmd_with_local_envvars(udf_command, server_env_vars_cur)
419 print(cmd)
420 thread_list.append(
421 execute_remote(cmd, ip, args.ssh_port, username=args.ssh_username)
422 )
423
424 # Start a cleanup process dedicated for cleaning up remote training jobs.
425 conn1, conn2 = multiprocessing.Pipe()
426 func = partial(get_all_remote_pids, hosts, args.ssh_port, udf_command)
427 process = multiprocessing.Process(target=cleanup_proc, args=(func, conn1))
428 process.start()
429
430 def signal_handler(signal, frame):
431 logging.info("Stop launcher")
432 # We need to tell the cleanup process to kill remote training jobs.
433 conn2.send("cleanup")
434 sys.exit(0)
435
436 signal.signal(signal.SIGINT, signal_handler)
437
438 for thread in thread_list:
439 thread.join()
440 # The training processes complete. We should tell the cleanup process to exit.
441 conn2.send("exit")
442 process.join()

Callers 1

mainFunction · 0.70

Calls 7

appendMethod · 0.80
startMethod · 0.80
execute_remoteFunction · 0.70
getMethod · 0.45
joinMethod · 0.45

Tested by

no test coverage detected