MCPcopy
hub / github.com/dgraph-io/dgraph / copyPredicateWithBatchCount

Method copyPredicateWithBatchCount

dgraph/cmd/bulk/reduce.go:552–599  ·  view source on GitHub ↗

copyPredicateWithBatchCount is like copyPredicateWithBatch but returns the count of entries copied.

(destDb *badger.DB, srcDb *badger.DB, pred string)

Source from the content-addressed store, hash-verified

550
551// copyPredicateWithBatchCount is like copyPredicateWithBatch but returns the count of entries copied.
552func (r *reducer) copyPredicateWithBatchCount(destDb *badger.DB, srcDb *badger.DB, pred string) int {
553 wb := destDb.NewManagedWriteBatch()
554 defer wb.Cancel()
555
556 stream := srcDb.NewStreamAt(math.MaxUint64)
557 stream.LogPrefix = fmt.Sprintf("copying predicate %s", pred)
558 stream.Prefix = x.PredicatePrefix(pred)
559 stream.NumGo = 1
560
561 var count int
562 stream.Send = func(buf *z.Buffer) error {
563 kvs, err := badger.BufferToKVList(buf)
564 if err != nil {
565 return err
566 }
567
568 for _, kv := range kvs.Kv {
569 userMeta := byte(0)
570 if len(kv.UserMeta) > 0 {
571 userMeta = kv.UserMeta[0]
572 }
573
574 entry := &badger.Entry{
575 Key: kv.Key,
576 Value: kv.Value,
577 UserMeta: userMeta,
578 }
579
580 if err := wb.SetEntryAt(entry, kv.Version); err != nil {
581 return fmt.Errorf("error writing to batch: %w", err)
582 }
583 count++
584 }
585 return nil
586 }
587
588 if err := stream.Orchestrate(context.Background()); err != nil {
589 glog.Warningf("Error streaming predicate %s: %v", pred, err)
590 return 0
591 }
592
593 if err := wb.Flush(); err != nil {
594 glog.Errorf("Error flushing batch for predicate %s: %v", pred, err)
595 return 0
596 }
597
598 return count
599}
600
601const limit = 2 << 30
602

Callers 1

Calls 4

PredicatePrefixFunction · 0.92
WarningfMethod · 0.80
FlushMethod · 0.65
ErrorfMethod · 0.45

Tested by

no test coverage detected