MCPcopy
hub / github.com/perkeep/perkeep / hourlyCompare

Method hourlyCompare

pkg/server/sync.go:1077–1133  ·  view source on GitHub ↗

Every hour, hourlyCompare picks blob names from a random point in the source, downloads up to hourlyBytes from the destination, and verifies them.

(hourlyBytes uint64)

Source from the content-addressed store, hash-verified

1075// Every hour, hourlyCompare picks blob names from a random point in the source,
1076// downloads up to hourlyBytes from the destination, and verifies them.
1077func (sh *SyncHandler) hourlyCompare(hourlyBytes uint64) {
1078 ctx := context.TODO()
1079 ticker := time.NewTicker(time.Hour).C
1080 for {
1081 content := make([]byte, 16)
1082 if _, err := rand.Read(content); err != nil {
1083 panic(err)
1084 }
1085 after := blob.RefFromBytes(content).String()
1086 var roundBytes uint64
1087 var roundBlobs int
1088 err := blobserver.EnumerateAllFrom(ctx, sh.from, after, func(sr blob.SizedRef) error {
1089 sh.mu.Lock()
1090 if _, ok := sh.needCopy[sr.Ref]; ok {
1091 sh.mu.Unlock()
1092 return nil // skip blobs in the copy queue
1093 }
1094 sh.mu.Unlock()
1095
1096 if roundBytes+uint64(sr.Size) > hourlyBytes {
1097 return errStopEnumerating
1098 }
1099 blob, size, err := sh.to.(blob.Fetcher).Fetch(ctx, sr.Ref)
1100 if err != nil {
1101 return fmt.Errorf("error fetching %s: %v", sr.Ref, err)
1102 }
1103 if size != sr.Size {
1104 return fmt.Errorf("%s: expected size %d, got %d", sr.Ref, sr.Size, size)
1105 }
1106 h := sr.Ref.Hash()
1107 if _, err := io.Copy(h, blob); err != nil {
1108 return fmt.Errorf("error while reading %s: %v", sr.Ref, err)
1109 }
1110 if !sr.HashMatches(h) {
1111 return fmt.Errorf("expected %s, got %x", sr.Ref, h.Sum(nil))
1112 }
1113
1114 sh.mu.Lock()
1115 sh.comparedBlobs++
1116 sh.comparedBytes += uint64(size)
1117 sh.compLastBlob = sr.Ref.String()
1118 sh.mu.Unlock()
1119 roundBlobs++
1120 roundBytes += uint64(size)
1121 return nil
1122 })
1123 sh.mu.Lock()
1124 if err != nil && err != errStopEnumerating {
1125 sh.compareErrors = append(sh.compareErrors, fmt.Sprintf("%s %v", time.Now(), err))
1126 sh.logf("!! hourly compare error !!: %v", err)
1127 }
1128 sh.comparedRounds++
1129 sh.mu.Unlock()
1130 sh.logf("compared %d blobs (%d bytes)", roundBlobs, roundBytes)
1131 <-ticker
1132 }
1133}
1134

Callers 1

newSyncFromConfigFunction · 0.80

Calls 11

logfMethod · 0.95
RefFromBytesFunction · 0.92
EnumerateAllFromFunction · 0.92
LockMethod · 0.80
UnlockMethod · 0.80
SumMethod · 0.80
FetchMethod · 0.65
HashMethod · 0.65
ReadMethod · 0.45
StringMethod · 0.45
HashMatchesMethod · 0.45

Tested by

no test coverage detected