enumerateQueuedBlobs yields blobs from the on-disk sorted.KeyValue store. This differs from enumeratePendingBlobs, which sends from the in-memory pending list.
(dst chan<- blob.SizedRef, intr <-chan struct{})
| 541 | // enumerateQueuedBlobs yields blobs from the on-disk sorted.KeyValue store. |
| 542 | // This differs from enumeratePendingBlobs, which sends from the in-memory pending list. |
| 543 | func (sh *SyncHandler) enumerateQueuedBlobs(dst chan<- blob.SizedRef, intr <-chan struct{}) error { |
| 544 | defer close(dst) |
| 545 | it := sh.queue.Find("", "") |
| 546 | for it.Next() { |
| 547 | br, ok := blob.Parse(it.Key()) |
| 548 | size, err := strconv.ParseUint(it.Value(), 10, 32) |
| 549 | if !ok || err != nil { |
| 550 | sh.logf("ERROR: bogus sync queue entry: %q => %q", it.Key(), it.Value()) |
| 551 | continue |
| 552 | } |
| 553 | select { |
| 554 | case dst <- blob.SizedRef{Ref: br, Size: uint32(size)}: |
| 555 | case <-intr: |
| 556 | return it.Close() |
| 557 | } |
| 558 | } |
| 559 | return it.Close() |
| 560 | } |
| 561 | |
| 562 | func (sh *SyncHandler) runSync(syncType string, enumSrc func(chan<- blob.SizedRef, <-chan struct{}) error) int { |
| 563 | enumch := make(chan blob.SizedRef, 8) |