(pfx string)
| 774 | } |
| 775 | |
| 776 | func (sh *SyncHandler) validateShardPrefix(pfx string) (err error) { |
| 777 | defer func() { |
| 778 | sh.mu.Lock() |
| 779 | if err != nil { |
| 780 | errs := fmt.Sprintf("Failed to validate prefix %s: %v", pfx, err) |
| 781 | sh.logf("%s", errs) |
| 782 | sh.vshardErrs = append(sh.vshardErrs, errs) |
| 783 | } |
| 784 | sh.vshardDone++ |
| 785 | sh.mu.Unlock() |
| 786 | }() |
| 787 | ctx, cancel := context.WithCancel(context.TODO()) |
| 788 | defer cancel() |
| 789 | src, serrc := sh.startValidatePrefix(ctx, pfx, false) |
| 790 | dst, derrc := sh.startValidatePrefix(ctx, pfx, true) |
| 791 | srcErr := &chanError{ |
| 792 | C: serrc, |
| 793 | Wrap: func(err error) error { |
| 794 | return fmt.Errorf("Error enumerating source %s for validating shard %s: %v", sh.fromName, pfx, err) |
| 795 | }, |
| 796 | } |
| 797 | dstErr := &chanError{ |
| 798 | C: derrc, |
| 799 | Wrap: func(err error) error { |
| 800 | return fmt.Errorf("Error enumerating target %s for validating shard %s: %v", sh.toName, pfx, err) |
| 801 | }, |
| 802 | } |
| 803 | |
| 804 | missingc := make(chan blob.SizedRef, 8) |
| 805 | go blobserver.ListMissingDestinationBlobs(missingc, func(blob.Ref) {}, src, dst) |
| 806 | |
| 807 | var missing []blob.SizedRef |
| 808 | for sb := range missingc { |
| 809 | missing = append(missing, sb) |
| 810 | } |
| 811 | |
| 812 | if err := srcErr.Get(); err != nil { |
| 813 | return err |
| 814 | } |
| 815 | if err := dstErr.Get(); err != nil { |
| 816 | return err |
| 817 | } |
| 818 | |
| 819 | for _, sb := range missing { |
| 820 | if enqErr := sh.enqueue(sb); enqErr != nil { |
| 821 | if err == nil { |
| 822 | err = enqErr |
| 823 | } |
| 824 | } else { |
| 825 | sh.mu.Lock() |
| 826 | sh.vmissing++ |
| 827 | sh.mu.Unlock() |
| 828 | } |
| 829 | } |
| 830 | return err |
| 831 | } |
| 832 | |
| 833 | var errNotPrefix = errors.New("sentinel error: hit blob into the next shard") |
no test coverage detected