| 99 | |
| 100 | |
| 101 | def start_local_trainers( |
| 102 | cluster, pod, training_script, training_script_args, log_dir=None, num_nodes=1, hack_output_dir=True |
| 103 | ): |
| 104 | current_env = copy.copy(os.environ.copy()) |
| 105 | # paddle broadcast ncclUniqueId use socket, and |
| 106 | # proxy maybe make trainers unreachable, so delete them. |
| 107 | # if we set them to "", grpc will log error message "bad uri" |
| 108 | # so just delete them. |
| 109 | # current_env.pop("http_proxy", None) |
| 110 | # current_env.pop("https_proxy", None) |
| 111 | |
| 112 | procs = [] |
| 113 | for idx, t in enumerate(pod.trainers): |
| 114 | local_rank = idx % (len(pod.trainers) // num_nodes) |
| 115 | node_rank = idx // (len(pod.trainers) // num_nodes) |
| 116 | proc_env = { |
| 117 | "FLAGS_selected_gpus": "%s" % ",".join([str(g) for g in t.gpus]), |
| 118 | "PADDLE_GLOBAL_SIZE": f"{len(pod.trainers)}", |
| 119 | "PADDLE_LOCAL_SIZE": f"{len(pod.trainers)//num_nodes}", |
| 120 | "PADDLE_GLOBAL_RANK": f"{idx}", |
| 121 | "PADDLE_LOCAL_RANK": f"{local_rank}", |
| 122 | "PADDLE_NNODES": f"{num_nodes}", |
| 123 | "PADDLE_TRAINER_ENDPOINTS": ",".join(cluster.trainers_endpoints()), |
| 124 | # compatible env |
| 125 | "PADDLE_CURRENT_ENDPOINT": "%s" % t.endpoint, |
| 126 | "PADDLE_TRAINER_ID": "%d" % t.rank, |
| 127 | "PADDLE_TRAINERS_NUM": "%d" % cluster.trainers_nranks(), |
| 128 | "PADDLE_RANK_IN_NODE": f"{local_rank}", |
| 129 | } |
| 130 | |
| 131 | current_env.update(proc_env) |
| 132 | |
| 133 | logger.debug(f"trainer proc env:{current_env}") |
| 134 | |
| 135 | if hack_output_dir and num_nodes > 1: |
| 136 | dir_idx = training_script_args.index("--output_dir") + 1 |
| 137 | script_args = copy.deepcopy(training_script_args) |
| 138 | script_args[dir_idx] = f"{script_args[dir_idx]}/node_{node_rank}" |
| 139 | else: |
| 140 | script_args = copy.deepcopy(training_script_args) |
| 141 | |
| 142 | cmd = [sys.executable, "-u", training_script] + script_args |
| 143 | |
| 144 | logger.info(f"start trainer proc:{cmd} env:{proc_env}") |
| 145 | |
| 146 | fn = None |
| 147 | if log_dir is not None: |
| 148 | os.makedirs(log_dir, exist_ok=True) |
| 149 | fn = open("%s/workerlog.n%d.c%d" % (log_dir, node_rank, local_rank), "a") |
| 150 | proc = subprocess.Popen(cmd, env=current_env, stdout=fn, stderr=fn) |
| 151 | else: |
| 152 | proc = subprocess.Popen(cmd, env=current_env) |
| 153 | |
| 154 | tp = TrainerProc() |
| 155 | tp.proc = proc |
| 156 | tp.rank = t.rank |
| 157 | tp.local_rank = idx |
| 158 | tp.log_fn = fn |