copyPredicateWithBatchCount is like copyPredicateWithBatch but returns the count of entries copied.
(destDb *badger.DB, srcDb *badger.DB, pred string)
| 550 | |
| 551 | // copyPredicateWithBatchCount is like copyPredicateWithBatch but returns the count of entries copied. |
| 552 | func (r *reducer) copyPredicateWithBatchCount(destDb *badger.DB, srcDb *badger.DB, pred string) int { |
| 553 | wb := destDb.NewManagedWriteBatch() |
| 554 | defer wb.Cancel() |
| 555 | |
| 556 | stream := srcDb.NewStreamAt(math.MaxUint64) |
| 557 | stream.LogPrefix = fmt.Sprintf("copying predicate %s", pred) |
| 558 | stream.Prefix = x.PredicatePrefix(pred) |
| 559 | stream.NumGo = 1 |
| 560 | |
| 561 | var count int |
| 562 | stream.Send = func(buf *z.Buffer) error { |
| 563 | kvs, err := badger.BufferToKVList(buf) |
| 564 | if err != nil { |
| 565 | return err |
| 566 | } |
| 567 | |
| 568 | for _, kv := range kvs.Kv { |
| 569 | userMeta := byte(0) |
| 570 | if len(kv.UserMeta) > 0 { |
| 571 | userMeta = kv.UserMeta[0] |
| 572 | } |
| 573 | |
| 574 | entry := &badger.Entry{ |
| 575 | Key: kv.Key, |
| 576 | Value: kv.Value, |
| 577 | UserMeta: userMeta, |
| 578 | } |
| 579 | |
| 580 | if err := wb.SetEntryAt(entry, kv.Version); err != nil { |
| 581 | return fmt.Errorf("error writing to batch: %w", err) |
| 582 | } |
| 583 | count++ |
| 584 | } |
| 585 | return nil |
| 586 | } |
| 587 | |
| 588 | if err := stream.Orchestrate(context.Background()); err != nil { |
| 589 | glog.Warningf("Error streaming predicate %s: %v", pred, err) |
| 590 | return 0 |
| 591 | } |
| 592 | |
| 593 | if err := wb.Flush(); err != nil { |
| 594 | glog.Errorf("Error flushing batch for predicate %s: %v", pred, err) |
| 595 | return 0 |
| 596 | } |
| 597 | |
| 598 | return count |
| 599 | } |
| 600 | |
| 601 | const limit = 2 << 30 |
| 602 |
no test coverage detected