MCPcopy
hub / github.com/PaddlePaddle/PaddleFormers / start_local_trainers

Function start_local_trainers

tests/parallel_launch.py:101–164  ·  view source on GitHub ↗
(
    cluster, pod, training_script, training_script_args, log_dir=None, num_nodes=1, hack_output_dir=True
)

Source from the content-addressed store, hash-verified

99
100
101def start_local_trainers(
102 cluster, pod, training_script, training_script_args, log_dir=None, num_nodes=1, hack_output_dir=True
103):
104 current_env = copy.copy(os.environ.copy())
105 # paddle broadcast ncclUniqueId use socket, and
106 # proxy maybe make trainers unreachable, so delete them.
107 # if we set them to "", grpc will log error message "bad uri"
108 # so just delete them.
109 # current_env.pop("http_proxy", None)
110 # current_env.pop("https_proxy", None)
111
112 procs = []
113 for idx, t in enumerate(pod.trainers):
114 local_rank = idx % (len(pod.trainers) // num_nodes)
115 node_rank = idx // (len(pod.trainers) // num_nodes)
116 proc_env = {
117 "FLAGS_selected_gpus": "%s" % ",".join([str(g) for g in t.gpus]),
118 "PADDLE_GLOBAL_SIZE": f"{len(pod.trainers)}",
119 "PADDLE_LOCAL_SIZE": f"{len(pod.trainers)//num_nodes}",
120 "PADDLE_GLOBAL_RANK": f"{idx}",
121 "PADDLE_LOCAL_RANK": f"{local_rank}",
122 "PADDLE_NNODES": f"{num_nodes}",
123 "PADDLE_TRAINER_ENDPOINTS": ",".join(cluster.trainers_endpoints()),
124 # compatible env
125 "PADDLE_CURRENT_ENDPOINT": "%s" % t.endpoint,
126 "PADDLE_TRAINER_ID": "%d" % t.rank,
127 "PADDLE_TRAINERS_NUM": "%d" % cluster.trainers_nranks(),
128 "PADDLE_RANK_IN_NODE": f"{local_rank}",
129 }
130
131 current_env.update(proc_env)
132
133 logger.debug(f"trainer proc env:{current_env}")
134
135 if hack_output_dir and num_nodes > 1:
136 dir_idx = training_script_args.index("--output_dir") + 1
137 script_args = copy.deepcopy(training_script_args)
138 script_args[dir_idx] = f"{script_args[dir_idx]}/node_{node_rank}"
139 else:
140 script_args = copy.deepcopy(training_script_args)
141
142 cmd = [sys.executable, "-u", training_script] + script_args
143
144 logger.info(f"start trainer proc:{cmd} env:{proc_env}")
145
146 fn = None
147 if log_dir is not None:
148 os.makedirs(log_dir, exist_ok=True)
149 fn = open("%s/workerlog.n%d.c%d" % (log_dir, node_rank, local_rank), "a")
150 proc = subprocess.Popen(cmd, env=current_env, stdout=fn, stderr=fn)
151 else:
152 proc = subprocess.Popen(cmd, env=current_env)
153
154 tp = TrainerProc()
155 tp.proc = proc
156 tp.rank = t.rank
157 tp.local_rank = idx
158 tp.log_fn = fn

Callers

nothing calls this directly

Calls 3

infoMethod · 0.80
updateMethod · 0.45
appendMethod · 0.45

Tested by

no test coverage detected