(syncType string, enumSrc func(chan<- blob.SizedRef, <-chan struct{}) error)
| 560 | } |
| 561 | |
| 562 | func (sh *SyncHandler) runSync(syncType string, enumSrc func(chan<- blob.SizedRef, <-chan struct{}) error) int { |
| 563 | enumch := make(chan blob.SizedRef, 8) |
| 564 | errch := make(chan error, 1) |
| 565 | intr := make(chan struct{}) |
| 566 | defer close(intr) |
| 567 | go func() { errch <- enumSrc(enumch, intr) }() |
| 568 | |
| 569 | nCopied := 0 |
| 570 | toCopy := 0 |
| 571 | |
| 572 | workch := make(chan blob.SizedRef, 1000) |
| 573 | resch := make(chan copyResult, 8) |
| 574 | FeedWork: |
| 575 | for sb := range enumch { |
| 576 | if toCopy < sh.copierPoolSize { |
| 577 | go sh.copyWorker(resch, workch) |
| 578 | } |
| 579 | select { |
| 580 | case workch <- sb: |
| 581 | toCopy++ |
| 582 | default: |
| 583 | // Buffer full. Enough for this batch. Will get it later. |
| 584 | break FeedWork |
| 585 | } |
| 586 | } |
| 587 | close(workch) |
| 588 | for i := 0; i < toCopy; i++ { |
| 589 | sh.setStatusf("Copying blobs") |
| 590 | res := <-resch |
| 591 | if res.err == nil { |
| 592 | nCopied++ |
| 593 | } |
| 594 | } |
| 595 | |
| 596 | if err := <-errch; err != nil { |
| 597 | sh.logf("error enumerating for %v sync: %v", syncType, err) |
| 598 | } |
| 599 | return nCopied |
| 600 | } |
| 601 | |
| 602 | func (sh *SyncHandler) syncLoop() { |
| 603 | for { |
no test coverage detected