MCPcopy Index your code
hub / github.com/dgraph-io/dgraph / processLoadFile

Method processLoadFile

dgraph/cmd/live/run.go:470–585  ·  view source on GitHub ↗
(ctx context.Context, rd *bufio.Reader, ck chunker.Chunker)

Source from the content-addressed store, hash-verified

468}
469
470func (l *loader) processLoadFile(ctx context.Context, rd *bufio.Reader, ck chunker.Chunker) error {
471 nqbuf := ck.NQuads()
472 errCh := make(chan error, 1)
473 // Spin a goroutine to push NQuads to mutation channel.
474 go func() {
475 var err error
476 defer func() {
477 errCh <- err
478 }()
479 buffer := make([]*api.NQuad, 0, opt.bufferSize*opt.batchSize)
480
481 drain := func() {
482 // We collect opt.bufferSize requests and preprocess them. For the requests
483 // to not conflict between themselves, we sort them on the basis of their predicates.
484 // Predicates with count index will conflict among themselves, so we keep them at
485 // end, making room for other predicates to load quickly.
486 sort.Slice(buffer, func(i, j int) bool {
487 iPred := sch.preds[x.NamespaceAttr(buffer[i].Namespace, buffer[i].Predicate)]
488 jPred := sch.preds[x.NamespaceAttr(buffer[j].Namespace, buffer[j].Predicate)]
489 t := func(a *Predicate) int {
490 if a != nil && a.Count {
491 return 1
492 }
493 return 0
494 }
495
496 // Sorts the nquads on basis of their predicates, while keeping the
497 // predicates with count index later than those without it.
498 if t(iPred) != t(jPred) {
499 return t(iPred) < t(jPred)
500 }
501 return buffer[i].Predicate < buffer[j].Predicate
502 })
503 for len(buffer) > 0 {
504 sz := opt.batchSize
505 if len(buffer) < opt.batchSize {
506 sz = len(buffer)
507 }
508 mu := &request{Mutation: &api.Mutation{Set: buffer[:sz]}}
509 l.reqs <- mu
510 buffer = buffer[sz:]
511 }
512 }
513
514 for nqs := range nqbuf.Ch() {
515 if len(nqs) == 0 {
516 continue
517 }
518
519 for _, nq := range nqs {
520 if !opt.preserveNs {
521 // If do not preserve namespace, use the namespace passed through
522 // `--force-namespace` flag.
523 nq.Namespace = opt.namespaceToLoad
524 }
525 if _, ok := l.namespaces[nq.Namespace]; !ok {
526 err = errors.Errorf("Cannot load nquad:%+v as its namespace doesn't exist.", nq)
527 return

Callers 1

processFileMethod · 0.95

Calls 13

allocateUidsMethod · 0.95
upsertUidsMethod · 0.95
uidMethod · 0.95
NamespaceAttrFunction · 0.92
CheckFunction · 0.92
SliceMethod · 0.80
ChMethod · 0.80
NQuadsMethod · 0.65
ChunkMethod · 0.65
ParseMethod · 0.65
FlushMethod · 0.65
ErrorfMethod · 0.45

Tested by

no test coverage detected