MCPcopy
hub / github.com/perkeep/perkeep / doPass

Method doPass

cmd/pk/sync.go:337–476  ·  view source on GitHub ↗

src: non-nil source dest: non-nil destination thirdLeg: optional third-leg client. if not nil, anything on src but not on dest will instead be copied to thirdLeg, instead of directly to dest. (sneakernet mode, copying to a portable drive and transporting thirdLeg to dest)

(src, dest, thirdLeg blobserver.Storage)

Source from the content-addressed store, hash-verified

335// directly to dest. (sneakernet mode, copying to a portable drive
336// and transporting thirdLeg to dest)
337func (c *syncCmd) doPass(src, dest, thirdLeg blobserver.Storage) (stats SyncStats, retErr error) {
338 var statsMu sync.Mutex // guards stats return value
339
340 srcBlobs := make(chan blob.SizedRef, 100)
341 destBlobs := make(chan blob.SizedRef, 100)
342 srcErr := make(chan error, 1)
343 destErr := make(chan error, 1)
344
345 ctx := context.TODO()
346 enumCtx, cancel := context.WithCancel(ctx) // used for all (2 or 3) enumerates
347 defer cancel()
348 enumerate := func(errc chan<- error, sto blobserver.Storage, blobc chan<- blob.SizedRef) {
349 err := enumerateAllBlobs(enumCtx, sto, blobc)
350 if err != nil {
351 cancel()
352 }
353 errc <- err
354 }
355
356 go enumerate(srcErr, src, srcBlobs)
357 checkSourceError := func() {
358 if err := <-srcErr; err != nil && err != context.Canceled {
359 retErr = fmt.Errorf("Enumerate error from source: %v", err)
360 }
361 }
362
363 if c.dest == "stdout" {
364 for sb := range srcBlobs {
365 fmt.Fprintf(cmdmain.Stdout, "%s %d\n", sb.Ref, sb.Size)
366 }
367 checkSourceError()
368 return
369 }
370
371 if c.wipe {
372 // TODO(mpl): dest is a client. make it send a "wipe" request?
373 // upon reception its server then wipes itself if it is a wiper.
374 log.Fatal("Index wiping not yet supported.")
375 }
376
377 go enumerate(destErr, dest, destBlobs)
378 checkDestError := func() {
379 if err := <-destErr; err != nil && err != context.Canceled {
380 retErr = fmt.Errorf("Enumerate error from destination: %v", err)
381 }
382 }
383
384 destNotHaveBlobs := make(chan blob.SizedRef)
385
386 readSrcBlobs := srcBlobs
387 if *cmdmain.FlagVerbose {
388 readSrcBlobs = loggingBlobRefChannel(srcBlobs)
389 }
390
391 mismatches := []blob.Ref{}
392
393 logErrorf := func(format string, args ...interface{}) {
394 log.Printf(format, args...)

Callers 2

RunCommandMethod · 0.95
syncAllMethod · 0.95

Calls 13

ReceiveFunction · 0.92
enumerateAllBlobsFunction · 0.85
loggingBlobRefChannelFunction · 0.85
FatalMethod · 0.80
PrintfMethod · 0.80
LockMethod · 0.80
UnlockMethod · 0.80
WaitMethod · 0.80
StartMethod · 0.65
FetchMethod · 0.65
RemoveBlobsMethod · 0.65

Tested by

no test coverage detected