Every hour, hourlyCompare picks blob names from a random point in the source, downloads up to hourlyBytes from the destination, and verifies them.
(hourlyBytes uint64)
| 1075 | // Every hour, hourlyCompare picks blob names from a random point in the source, |
| 1076 | // downloads up to hourlyBytes from the destination, and verifies them. |
| 1077 | func (sh *SyncHandler) hourlyCompare(hourlyBytes uint64) { |
| 1078 | ctx := context.TODO() |
| 1079 | ticker := time.NewTicker(time.Hour).C |
| 1080 | for { |
| 1081 | content := make([]byte, 16) |
| 1082 | if _, err := rand.Read(content); err != nil { |
| 1083 | panic(err) |
| 1084 | } |
| 1085 | after := blob.RefFromBytes(content).String() |
| 1086 | var roundBytes uint64 |
| 1087 | var roundBlobs int |
| 1088 | err := blobserver.EnumerateAllFrom(ctx, sh.from, after, func(sr blob.SizedRef) error { |
| 1089 | sh.mu.Lock() |
| 1090 | if _, ok := sh.needCopy[sr.Ref]; ok { |
| 1091 | sh.mu.Unlock() |
| 1092 | return nil // skip blobs in the copy queue |
| 1093 | } |
| 1094 | sh.mu.Unlock() |
| 1095 | |
| 1096 | if roundBytes+uint64(sr.Size) > hourlyBytes { |
| 1097 | return errStopEnumerating |
| 1098 | } |
| 1099 | blob, size, err := sh.to.(blob.Fetcher).Fetch(ctx, sr.Ref) |
| 1100 | if err != nil { |
| 1101 | return fmt.Errorf("error fetching %s: %v", sr.Ref, err) |
| 1102 | } |
| 1103 | if size != sr.Size { |
| 1104 | return fmt.Errorf("%s: expected size %d, got %d", sr.Ref, sr.Size, size) |
| 1105 | } |
| 1106 | h := sr.Ref.Hash() |
| 1107 | if _, err := io.Copy(h, blob); err != nil { |
| 1108 | return fmt.Errorf("error while reading %s: %v", sr.Ref, err) |
| 1109 | } |
| 1110 | if !sr.HashMatches(h) { |
| 1111 | return fmt.Errorf("expected %s, got %x", sr.Ref, h.Sum(nil)) |
| 1112 | } |
| 1113 | |
| 1114 | sh.mu.Lock() |
| 1115 | sh.comparedBlobs++ |
| 1116 | sh.comparedBytes += uint64(size) |
| 1117 | sh.compLastBlob = sr.Ref.String() |
| 1118 | sh.mu.Unlock() |
| 1119 | roundBlobs++ |
| 1120 | roundBytes += uint64(size) |
| 1121 | return nil |
| 1122 | }) |
| 1123 | sh.mu.Lock() |
| 1124 | if err != nil && err != errStopEnumerating { |
| 1125 | sh.compareErrors = append(sh.compareErrors, fmt.Sprintf("%s %v", time.Now(), err)) |
| 1126 | sh.logf("!! hourly compare error !!: %v", err) |
| 1127 | } |
| 1128 | sh.comparedRounds++ |
| 1129 | sh.mu.Unlock() |
| 1130 | sh.logf("compared %d blobs (%d bytes)", roundBlobs, roundBytes) |
| 1131 | <-ticker |
| 1132 | } |
| 1133 | } |
| 1134 |
no test coverage detected