| 257 | |
| 258 | |
| 259 | def _ddp_runner(proc_id, nprocs, g, data, args): |
| 260 | mode, drop_last = args |
| 261 | indices, sampler = data |
| 262 | if mode == "cpu": |
| 263 | device = torch.device("cpu") |
| 264 | else: |
| 265 | device = torch.device(proc_id) |
| 266 | torch.cuda.set_device(device) |
| 267 | if mode == "pure_gpu": |
| 268 | g = g.to(F.cuda()) |
| 269 | if mode in ("cpu", "uva_cpu_indices"): |
| 270 | indices = indices.cpu() |
| 271 | else: |
| 272 | indices = indices.cuda() |
| 273 | |
| 274 | dist.init_process_group( |
| 275 | "nccl" if mode != "cpu" else "gloo", |
| 276 | "tcp://127.0.0.1:12347", |
| 277 | world_size=nprocs, |
| 278 | rank=proc_id, |
| 279 | ) |
| 280 | use_uva = mode.startswith("uva") |
| 281 | batch_size = g.num_nodes() |
| 282 | shuffle = False |
| 283 | for num_workers in [1, 4] if mode == "cpu" else [0]: |
| 284 | dataloader = dgl.dataloading.DataLoader( |
| 285 | g, |
| 286 | indices, |
| 287 | sampler, |
| 288 | device=device, |
| 289 | batch_size=batch_size, # g1.num_nodes(), |
| 290 | num_workers=num_workers, |
| 291 | use_uva=use_uva, |
| 292 | use_ddp=True, |
| 293 | drop_last=drop_last, |
| 294 | shuffle=shuffle, |
| 295 | ) |
| 296 | max_nid = [0] |
| 297 | for i, (input_nodes, output_nodes, blocks) in enumerate(dataloader): |
| 298 | block = blocks[-1] |
| 299 | o_src, o_dst = block.edges() |
| 300 | src_nodes_id = block.srcdata[dgl.NID][o_src] |
| 301 | dst_nodes_id = block.dstdata[dgl.NID][o_dst] |
| 302 | max_nid.append(np.max(dst_nodes_id.cpu().numpy())) |
| 303 | |
| 304 | local_max = torch.tensor(np.max(max_nid)) |
| 305 | if torch.distributed.get_backend() == "nccl": |
| 306 | local_max = local_max.cuda() |
| 307 | dist.reduce(local_max, 0, op=dist.ReduceOp.MAX) |
| 308 | if proc_id == 0: |
| 309 | if drop_last and not shuffle and local_max > 0: |
| 310 | assert ( |
| 311 | local_max.item() |
| 312 | == len(indices) |
| 313 | - len(indices) % nprocs |
| 314 | - 1 |
| 315 | - (len(indices) // nprocs) % batch_size |
| 316 | ) |