MCPcopy
hub / github.com/perkeep/perkeep / validateShardPrefix

Method validateShardPrefix

pkg/server/sync.go:776–831  ·  view source on GitHub ↗
(pfx string)

Source from the content-addressed store, hash-verified

774}
775
776func (sh *SyncHandler) validateShardPrefix(pfx string) (err error) {
777 defer func() {
778 sh.mu.Lock()
779 if err != nil {
780 errs := fmt.Sprintf("Failed to validate prefix %s: %v", pfx, err)
781 sh.logf("%s", errs)
782 sh.vshardErrs = append(sh.vshardErrs, errs)
783 }
784 sh.vshardDone++
785 sh.mu.Unlock()
786 }()
787 ctx, cancel := context.WithCancel(context.TODO())
788 defer cancel()
789 src, serrc := sh.startValidatePrefix(ctx, pfx, false)
790 dst, derrc := sh.startValidatePrefix(ctx, pfx, true)
791 srcErr := &chanError{
792 C: serrc,
793 Wrap: func(err error) error {
794 return fmt.Errorf("Error enumerating source %s for validating shard %s: %v", sh.fromName, pfx, err)
795 },
796 }
797 dstErr := &chanError{
798 C: derrc,
799 Wrap: func(err error) error {
800 return fmt.Errorf("Error enumerating target %s for validating shard %s: %v", sh.toName, pfx, err)
801 },
802 }
803
804 missingc := make(chan blob.SizedRef, 8)
805 go blobserver.ListMissingDestinationBlobs(missingc, func(blob.Ref) {}, src, dst)
806
807 var missing []blob.SizedRef
808 for sb := range missingc {
809 missing = append(missing, sb)
810 }
811
812 if err := srcErr.Get(); err != nil {
813 return err
814 }
815 if err := dstErr.Get(); err != nil {
816 return err
817 }
818
819 for _, sb := range missing {
820 if enqErr := sh.enqueue(sb); enqErr != nil {
821 if err == nil {
822 err = enqErr
823 }
824 } else {
825 sh.mu.Lock()
826 sh.vmissing++
827 sh.mu.Unlock()
828 }
829 }
830 return err
831}
832
833var errNotPrefix = errors.New("sentinel error: hit blob into the next shard")

Callers 1

runFullValidationMethod · 0.95

Calls 7

logfMethod · 0.95
startValidatePrefixMethod · 0.95
GetMethod · 0.95
enqueueMethod · 0.95
LockMethod · 0.80
UnlockMethod · 0.80

Tested by

no test coverage detected