| 499 | } |
| 500 | |
| 501 | func (c *LocalCluster) BulkLoad(opts BulkOpts) error { |
| 502 | zeroURL, err := c.zeros[0].zeroURL(c) |
| 503 | if err != nil { |
| 504 | return errors.Wrap(err, "error finding URL of first zero") |
| 505 | } |
| 506 | |
| 507 | var outDir string |
| 508 | if opts.OutDir != "" { |
| 509 | outDir = opts.OutDir |
| 510 | } else { |
| 511 | outDir = c.conf.bulkOutDirForMount |
| 512 | } |
| 513 | |
| 514 | // Determine shard counts - use explicit values if provided, otherwise calculate from cluster config |
| 515 | mapShards := opts.MapShards |
| 516 | reduceShards := opts.ReduceShards |
| 517 | if mapShards == 0 { |
| 518 | mapShards = c.conf.numAlphas / c.conf.replicas |
| 519 | } |
| 520 | if reduceShards == 0 { |
| 521 | reduceShards = c.conf.numAlphas / c.conf.replicas |
| 522 | } |
| 523 | |
| 524 | args := []string{"bulk", |
| 525 | "--store_xids=true", |
| 526 | "--zero", zeroURL, |
| 527 | "--reduce_shards", strconv.Itoa(reduceShards), |
| 528 | "--map_shards", strconv.Itoa(mapShards), |
| 529 | "--out", outDir, |
| 530 | // we had to create the dir for setting up docker, hence, replacing it here. |
| 531 | "--replace_out", |
| 532 | // Use :0 to let OS assign random available port for pprof, avoids conflicts in tests |
| 533 | "--http", ":0", |
| 534 | } |
| 535 | |
| 536 | if opts.TmpDir != "" { |
| 537 | args = append(args, "--tmp", opts.TmpDir) |
| 538 | } |
| 539 | if opts.SkipReducePhase { |
| 540 | args = append(args, "--skip_reduce_phase") |
| 541 | } |
| 542 | if opts.SkipMapPhase { |
| 543 | args = append(args, "--skip_map_phase") |
| 544 | } |
| 545 | |
| 546 | if len(opts.DataFiles) > 0 { |
| 547 | args = append(args, "-f", strings.Join(opts.DataFiles, ",")) |
| 548 | } |
| 549 | if len(opts.SchemaFiles) > 0 { |
| 550 | args = append(args, "-s", strings.Join(opts.SchemaFiles, ",")) |
| 551 | } |
| 552 | if len(opts.GQLSchemaFiles) > 0 { |
| 553 | args = append(args, "-g", strings.Join(opts.GQLSchemaFiles, ",")) |
| 554 | } |
| 555 | |
| 556 | log.Printf("[INFO] running bulk loader with args: [%v]", strings.Join(args, " ")) |
| 557 | cmd := exec.Command(c.HostDgraphBinaryPath(), args...) |
| 558 | if out, err := cmd.CombinedOutput(); err != nil { |