MCPcopy
hub / github.com/kopia/kopia / runSyncBlobs

Method runSyncBlobs

cli/command_repository_sync.go:231–286  ·  view source on GitHub ↗
(ctx context.Context, src blob.Reader, dst blob.Storage, blobsToCopy, blobsToDelete []blob.Metadata, totalBytes int64)

Source from the content-addressed store, hash-verified

229}
230
231func (c *commandRepositorySyncTo) runSyncBlobs(ctx context.Context, src blob.Reader, dst blob.Storage, blobsToCopy, blobsToDelete []blob.Metadata, totalBytes int64) error {
232 eg, ctx := errgroup.WithContext(ctx)
233 copyCh := sliceToChannel(ctx, blobsToCopy)
234 deleteCh := sliceToChannel(ctx, blobsToDelete)
235
236 var progressMutex sync.Mutex
237
238 var totalCopied stats.CountSum
239
240 tt := timetrack.Start()
241
242 for workerID := range c.repositorySyncParallelism {
243 eg.Go(func() error {
244 for m := range copyCh {
245 log(ctx).Debugf("[%v] Copying %v (%v bytes)...\n", workerID, m.BlobID, m.Length)
246
247 if err := c.syncCopyBlob(ctx, m, src, dst); err != nil {
248 return errors.Wrapf(err, "error copying %v", m.BlobID)
249 }
250
251 numBlobs, bytesCopied := totalCopied.Add(m.Length)
252 eta := "unknown"
253 speed := "-"
254
255 progressMutex.Lock()
256
257 if est, ok := tt.Estimate(float64(bytesCopied), float64(totalBytes)); ok {
258 eta = fmt.Sprintf("%v (%v)", est.Remaining, formatTimestamp(est.EstimatedEndTime))
259 speed = units.BytesPerSecondsString(est.SpeedPerSecond)
260 }
261
262 c.outputSyncProgress(
263 fmt.Sprintf(" Copied %v blobs (%v), Speed: %v, ETA: %v",
264 numBlobs, units.BytesString(bytesCopied), speed, eta))
265
266 progressMutex.Unlock()
267 }
268
269 for m := range deleteCh {
270 log(ctx).Debugf("[%v] Deleting %v (%v bytes)...\n", workerID, m.BlobID, m.Length)
271
272 if err := syncDeleteBlob(ctx, m, dst); err != nil {
273 return errors.Wrapf(err, "error deleting %v", m.BlobID)
274 }
275 }
276
277 return nil
278 })
279 }
280
281 if err := eg.Wait(); err != nil {
282 return errors.Wrap(err, "error copying blobs")
283 }
284
285 return nil
286}
287
288func sliceToChannel(ctx context.Context, md []blob.Metadata) chan blob.Metadata {

Callers 1

runSyncWithStorageMethod · 0.95

Calls 13

syncCopyBlobMethod · 0.95
AddMethod · 0.95
outputSyncProgressMethod · 0.95
StartFunction · 0.92
BytesPerSecondsStringFunction · 0.92
BytesStringFunction · 0.92
sliceToChannelFunction · 0.85
formatTimestampFunction · 0.85
syncDeleteBlobFunction · 0.85
EstimateMethod · 0.80
LockMethod · 0.65
UnlockMethod · 0.65

Tested by

no test coverage detected