MCPcopy Index your code
hub / github.com/perkeep/perkeep / runSync

Method runSync

pkg/server/sync.go:562–600  ·  view source on GitHub ↗
(syncType string, enumSrc func(chan<- blob.SizedRef, <-chan struct{}) error)

Source from the content-addressed store, hash-verified

560}
561
562func (sh *SyncHandler) runSync(syncType string, enumSrc func(chan<- blob.SizedRef, <-chan struct{}) error) int {
563 enumch := make(chan blob.SizedRef, 8)
564 errch := make(chan error, 1)
565 intr := make(chan struct{})
566 defer close(intr)
567 go func() { errch <- enumSrc(enumch, intr) }()
568
569 nCopied := 0
570 toCopy := 0
571
572 workch := make(chan blob.SizedRef, 1000)
573 resch := make(chan copyResult, 8)
574FeedWork:
575 for sb := range enumch {
576 if toCopy < sh.copierPoolSize {
577 go sh.copyWorker(resch, workch)
578 }
579 select {
580 case workch <- sb:
581 toCopy++
582 default:
583 // Buffer full. Enough for this batch. Will get it later.
584 break FeedWork
585 }
586 }
587 close(workch)
588 for i := 0; i < toCopy; i++ {
589 sh.setStatusf("Copying blobs")
590 res := <-resch
591 if res.err == nil {
592 nCopied++
593 }
594 }
595
596 if err := <-errch; err != nil {
597 sh.logf("error enumerating for %v sync: %v", syncType, err)
598 }
599 return nCopied
600}
601
602func (sh *SyncHandler) syncLoop() {
603 for {

Callers 2

syncLoopMethod · 0.95
newSyncFromConfigFunction · 0.80

Calls 3

copyWorkerMethod · 0.95
setStatusfMethod · 0.95
logfMethod · 0.95

Tested by

no test coverage detected