LiveLoadFromExport runs the live loader from the output of dgraph export The exportDir is the directory present inside the container. This function first copies all the files on the host and then runs the live loader.
(exportDir string)
| 391 | // The exportDir is the directory present inside the container. This function |
| 392 | // first copies all the files on the host and then runs the live loader. |
| 393 | func (c *LocalCluster) LiveLoadFromExport(exportDir string) error { |
| 394 | ctx, cancel := context.WithTimeout(context.Background(), requestTimeout) |
| 395 | defer cancel() |
| 396 | exportDirHost, err := os.MkdirTemp("", "dgraph-export") |
| 397 | if err != nil { |
| 398 | return errors.Wrap(err, "error creating temp dir for exported data") |
| 399 | } |
| 400 | defer func() { |
| 401 | if err := os.RemoveAll(exportDirHost); err != nil { |
| 402 | log.Printf("[WARNING] error removing export copy on the host: %v", err) |
| 403 | } |
| 404 | }() |
| 405 | |
| 406 | // we need to copy the exported data from the container to host |
| 407 | ts, _, err := c.dcli.CopyFromContainer(ctx, c.alphas[0].cid(), exportDir) |
| 408 | if err != nil { |
| 409 | return errors.Wrapf(err, "error copying export dir from container [%v]", c.alphas[0].cname()) |
| 410 | } |
| 411 | defer func() { |
| 412 | if err := ts.Close(); err != nil { |
| 413 | log.Printf("[WARNING] error closing tared stream from docker cp for [%v]", c.alphas[0].cname()) |
| 414 | } |
| 415 | }() |
| 416 | |
| 417 | // .rdf.gz, .schema.gz,.gql_schema.gz |
| 418 | var rdfFiles, schemaFiles, gqlSchemaFiles, jsonFiles []string |
| 419 | tr := tar.NewReader(ts) |
| 420 | for { |
| 421 | header, err := tr.Next() |
| 422 | if err == io.EOF { |
| 423 | break |
| 424 | } else if err != nil { |
| 425 | return errors.Wrapf(err, "error reading file in tared stream: [%+v]", header) |
| 426 | } |
| 427 | if header.Typeflag != tar.TypeReg { |
| 428 | continue |
| 429 | } |
| 430 | |
| 431 | fileName := filepath.Base(header.Name) |
| 432 | hostFile := filepath.Join(exportDirHost, fileName) |
| 433 | switch { |
| 434 | case strings.HasSuffix(fileName, ".rdf.gz"): |
| 435 | rdfFiles = append(rdfFiles, hostFile) |
| 436 | case strings.HasSuffix(fileName, ".json.gz"): |
| 437 | jsonFiles = append(jsonFiles, hostFile) |
| 438 | case strings.HasSuffix(fileName, ".schema.gz"): |
| 439 | schemaFiles = append(schemaFiles, hostFile) |
| 440 | case strings.HasSuffix(fileName, ".gql_schema.gz"): |
| 441 | gqlSchemaFiles = append(gqlSchemaFiles, hostFile) |
| 442 | default: |
| 443 | return errors.Errorf("found unexpected file in export: %v", fileName) |
| 444 | } |
| 445 | |
| 446 | fd, err := os.Create(hostFile) |
| 447 | if err != nil { |
| 448 | return errors.Wrapf(err, "error creating file [%v]", hostFile) |
| 449 | } |
| 450 | defer func() { |