(ctx context.Context, src blob.Reader, dst blob.Storage, blobsToCopy, blobsToDelete []blob.Metadata, totalBytes int64)
| 229 | } |
| 230 | |
| 231 | func (c *commandRepositorySyncTo) runSyncBlobs(ctx context.Context, src blob.Reader, dst blob.Storage, blobsToCopy, blobsToDelete []blob.Metadata, totalBytes int64) error { |
| 232 | eg, ctx := errgroup.WithContext(ctx) |
| 233 | copyCh := sliceToChannel(ctx, blobsToCopy) |
| 234 | deleteCh := sliceToChannel(ctx, blobsToDelete) |
| 235 | |
| 236 | var progressMutex sync.Mutex |
| 237 | |
| 238 | var totalCopied stats.CountSum |
| 239 | |
| 240 | tt := timetrack.Start() |
| 241 | |
| 242 | for workerID := range c.repositorySyncParallelism { |
| 243 | eg.Go(func() error { |
| 244 | for m := range copyCh { |
| 245 | log(ctx).Debugf("[%v] Copying %v (%v bytes)...\n", workerID, m.BlobID, m.Length) |
| 246 | |
| 247 | if err := c.syncCopyBlob(ctx, m, src, dst); err != nil { |
| 248 | return errors.Wrapf(err, "error copying %v", m.BlobID) |
| 249 | } |
| 250 | |
| 251 | numBlobs, bytesCopied := totalCopied.Add(m.Length) |
| 252 | eta := "unknown" |
| 253 | speed := "-" |
| 254 | |
| 255 | progressMutex.Lock() |
| 256 | |
| 257 | if est, ok := tt.Estimate(float64(bytesCopied), float64(totalBytes)); ok { |
| 258 | eta = fmt.Sprintf("%v (%v)", est.Remaining, formatTimestamp(est.EstimatedEndTime)) |
| 259 | speed = units.BytesPerSecondsString(est.SpeedPerSecond) |
| 260 | } |
| 261 | |
| 262 | c.outputSyncProgress( |
| 263 | fmt.Sprintf(" Copied %v blobs (%v), Speed: %v, ETA: %v", |
| 264 | numBlobs, units.BytesString(bytesCopied), speed, eta)) |
| 265 | |
| 266 | progressMutex.Unlock() |
| 267 | } |
| 268 | |
| 269 | for m := range deleteCh { |
| 270 | log(ctx).Debugf("[%v] Deleting %v (%v bytes)...\n", workerID, m.BlobID, m.Length) |
| 271 | |
| 272 | if err := syncDeleteBlob(ctx, m, dst); err != nil { |
| 273 | return errors.Wrapf(err, "error deleting %v", m.BlobID) |
| 274 | } |
| 275 | } |
| 276 | |
| 277 | return nil |
| 278 | }) |
| 279 | } |
| 280 | |
| 281 | if err := eg.Wait(); err != nil { |
| 282 | return errors.Wrap(err, "error copying blobs") |
| 283 | } |
| 284 | |
| 285 | return nil |
| 286 | } |
| 287 | |
| 288 | func sliceToChannel(ctx context.Context, md []blob.Metadata) chan blob.Metadata { |
no test coverage detected