Submit distributed jobs (server and client processes) via ssh
(args, udf_command)
| 383 | |
| 384 | |
| 385 | def submit_jobs(args, udf_command): |
| 386 | """Submit distributed jobs (server and client processes) via ssh""" |
| 387 | hosts = [] |
| 388 | thread_list = [] |
| 389 | server_count_per_machine = 0 |
| 390 | |
| 391 | # Get the IP addresses of the cluster. |
| 392 | # ip_config = os.path.join(args.workspace, args.ip_config) |
| 393 | ip_config = args.ip_config |
| 394 | with open(ip_config) as f: |
| 395 | for line in f: |
| 396 | result = line.strip().split() |
| 397 | if len(result) == 2: |
| 398 | ip = result[0] |
| 399 | port = int(result[1]) |
| 400 | hosts.append((ip, port)) |
| 401 | elif len(result) == 1: |
| 402 | ip = result[0] |
| 403 | port = DEFAULT_PORT |
| 404 | hosts.append((ip, port)) |
| 405 | else: |
| 406 | raise RuntimeError("Format error of ip_config.") |
| 407 | server_count_per_machine = args.num_proc_per_machine |
| 408 | |
| 409 | # launch server tasks |
| 410 | server_env_vars = construct_dgl_server_env_vars( |
| 411 | ip_config=args.ip_config, |
| 412 | num_proc_per_machine=args.num_proc_per_machine, |
| 413 | pythonpath=os.environ.get("PYTHONPATH", ""), |
| 414 | ) |
| 415 | for i in range(len(hosts) * server_count_per_machine): |
| 416 | ip, _ = hosts[int(i / server_count_per_machine)] |
| 417 | server_env_vars_cur = f"{server_env_vars} RANK={i} MASTER_ADDR={hosts[0][0]} MASTER_PORT={args.master_port}" |
| 418 | cmd = wrap_cmd_with_local_envvars(udf_command, server_env_vars_cur) |
| 419 | print(cmd) |
| 420 | thread_list.append( |
| 421 | execute_remote(cmd, ip, args.ssh_port, username=args.ssh_username) |
| 422 | ) |
| 423 | |
| 424 | # Start a cleanup process dedicated for cleaning up remote training jobs. |
| 425 | conn1, conn2 = multiprocessing.Pipe() |
| 426 | func = partial(get_all_remote_pids, hosts, args.ssh_port, udf_command) |
| 427 | process = multiprocessing.Process(target=cleanup_proc, args=(func, conn1)) |
| 428 | process.start() |
| 429 | |
| 430 | def signal_handler(signal, frame): |
| 431 | logging.info("Stop launcher") |
| 432 | # We need to tell the cleanup process to kill remote training jobs. |
| 433 | conn2.send("cleanup") |
| 434 | sys.exit(0) |
| 435 | |
| 436 | signal.signal(signal.SIGINT, signal_handler) |
| 437 | |
| 438 | for thread in thread_list: |
| 439 | thread.join() |
| 440 | # The training processes complete. We should tell the cleanup process to exit. |
| 441 | conn2.send("exit") |
| 442 | process.join() |
no test coverage detected