src: non-nil source dest: non-nil destination thirdLeg: optional third-leg client. if not nil, anything on src but not on dest will instead be copied to thirdLeg, instead of directly to dest. (sneakernet mode, copying to a portable drive and transporting thirdLeg to dest)
(src, dest, thirdLeg blobserver.Storage)
| 335 | // directly to dest. (sneakernet mode, copying to a portable drive |
| 336 | // and transporting thirdLeg to dest) |
| 337 | func (c *syncCmd) doPass(src, dest, thirdLeg blobserver.Storage) (stats SyncStats, retErr error) { |
| 338 | var statsMu sync.Mutex // guards stats return value |
| 339 | |
| 340 | srcBlobs := make(chan blob.SizedRef, 100) |
| 341 | destBlobs := make(chan blob.SizedRef, 100) |
| 342 | srcErr := make(chan error, 1) |
| 343 | destErr := make(chan error, 1) |
| 344 | |
| 345 | ctx := context.TODO() |
| 346 | enumCtx, cancel := context.WithCancel(ctx) // used for all (2 or 3) enumerates |
| 347 | defer cancel() |
| 348 | enumerate := func(errc chan<- error, sto blobserver.Storage, blobc chan<- blob.SizedRef) { |
| 349 | err := enumerateAllBlobs(enumCtx, sto, blobc) |
| 350 | if err != nil { |
| 351 | cancel() |
| 352 | } |
| 353 | errc <- err |
| 354 | } |
| 355 | |
| 356 | go enumerate(srcErr, src, srcBlobs) |
| 357 | checkSourceError := func() { |
| 358 | if err := <-srcErr; err != nil && err != context.Canceled { |
| 359 | retErr = fmt.Errorf("Enumerate error from source: %v", err) |
| 360 | } |
| 361 | } |
| 362 | |
| 363 | if c.dest == "stdout" { |
| 364 | for sb := range srcBlobs { |
| 365 | fmt.Fprintf(cmdmain.Stdout, "%s %d\n", sb.Ref, sb.Size) |
| 366 | } |
| 367 | checkSourceError() |
| 368 | return |
| 369 | } |
| 370 | |
| 371 | if c.wipe { |
| 372 | // TODO(mpl): dest is a client. make it send a "wipe" request? |
| 373 | // upon reception its server then wipes itself if it is a wiper. |
| 374 | log.Fatal("Index wiping not yet supported.") |
| 375 | } |
| 376 | |
| 377 | go enumerate(destErr, dest, destBlobs) |
| 378 | checkDestError := func() { |
| 379 | if err := <-destErr; err != nil && err != context.Canceled { |
| 380 | retErr = fmt.Errorf("Enumerate error from destination: %v", err) |
| 381 | } |
| 382 | } |
| 383 | |
| 384 | destNotHaveBlobs := make(chan blob.SizedRef) |
| 385 | |
| 386 | readSrcBlobs := srcBlobs |
| 387 | if *cmdmain.FlagVerbose { |
| 388 | readSrcBlobs = loggingBlobRefChannel(srcBlobs) |
| 389 | } |
| 390 | |
| 391 | mismatches := []blob.Ref{} |
| 392 | |
| 393 | logErrorf := func(format string, args ...interface{}) { |
| 394 | log.Printf(format, args...) |
no test coverage detected