(ctx context.Context, rd *bufio.Reader, ck chunker.Chunker)
| 468 | } |
| 469 | |
| 470 | func (l *loader) processLoadFile(ctx context.Context, rd *bufio.Reader, ck chunker.Chunker) error { |
| 471 | nqbuf := ck.NQuads() |
| 472 | errCh := make(chan error, 1) |
| 473 | // Spin a goroutine to push NQuads to mutation channel. |
| 474 | go func() { |
| 475 | var err error |
| 476 | defer func() { |
| 477 | errCh <- err |
| 478 | }() |
| 479 | buffer := make([]*api.NQuad, 0, opt.bufferSize*opt.batchSize) |
| 480 | |
| 481 | drain := func() { |
| 482 | // We collect opt.bufferSize requests and preprocess them. For the requests |
| 483 | // to not conflict between themselves, we sort them on the basis of their predicates. |
| 484 | // Predicates with count index will conflict among themselves, so we keep them at |
| 485 | // end, making room for other predicates to load quickly. |
| 486 | sort.Slice(buffer, func(i, j int) bool { |
| 487 | iPred := sch.preds[x.NamespaceAttr(buffer[i].Namespace, buffer[i].Predicate)] |
| 488 | jPred := sch.preds[x.NamespaceAttr(buffer[j].Namespace, buffer[j].Predicate)] |
| 489 | t := func(a *Predicate) int { |
| 490 | if a != nil && a.Count { |
| 491 | return 1 |
| 492 | } |
| 493 | return 0 |
| 494 | } |
| 495 | |
| 496 | // Sorts the nquads on basis of their predicates, while keeping the |
| 497 | // predicates with count index later than those without it. |
| 498 | if t(iPred) != t(jPred) { |
| 499 | return t(iPred) < t(jPred) |
| 500 | } |
| 501 | return buffer[i].Predicate < buffer[j].Predicate |
| 502 | }) |
| 503 | for len(buffer) > 0 { |
| 504 | sz := opt.batchSize |
| 505 | if len(buffer) < opt.batchSize { |
| 506 | sz = len(buffer) |
| 507 | } |
| 508 | mu := &request{Mutation: &api.Mutation{Set: buffer[:sz]}} |
| 509 | l.reqs <- mu |
| 510 | buffer = buffer[sz:] |
| 511 | } |
| 512 | } |
| 513 | |
| 514 | for nqs := range nqbuf.Ch() { |
| 515 | if len(nqs) == 0 { |
| 516 | continue |
| 517 | } |
| 518 | |
| 519 | for _, nq := range nqs { |
| 520 | if !opt.preserveNs { |
| 521 | // If do not preserve namespace, use the namespace passed through |
| 522 | // `--force-namespace` flag. |
| 523 | nq.Namespace = opt.namespaceToLoad |
| 524 | } |
| 525 | if _, ok := l.namespaces[nq.Namespace]; !ok { |
| 526 | err = errors.Errorf("Cannot load nquad:%+v as its namespace doesn't exist.", nq) |
| 527 | return |
no test coverage detected