serve starts the progress emitter which starts emitting DownloadProgress events as the progress happens.
(ctx context.Context)
| 64 | // serve starts the progress emitter which starts emitting DownloadProgress |
| 65 | // events as the progress happens. |
| 66 | func (t *ProgressEmitter) Serve(ctx context.Context) error { |
| 67 | t.cfg.Subscribe(t) |
| 68 | defer t.cfg.Unsubscribe(t) |
| 69 | |
| 70 | var lastUpdate time.Time |
| 71 | var lastCount, newCount int |
| 72 | for { |
| 73 | select { |
| 74 | case <-ctx.Done(): |
| 75 | slog.Debug("Progress emitter: stopping") |
| 76 | return nil |
| 77 | case <-t.timer.C: |
| 78 | t.mut.Lock() |
| 79 | |
| 80 | newLastUpdated := lastUpdate |
| 81 | newCount = t.lenRegistryLocked() |
| 82 | var progressUpdates []progressUpdate |
| 83 | for _, pullers := range t.registry { |
| 84 | for _, puller := range pullers { |
| 85 | if updated := puller.Updated(); updated.After(newLastUpdated) { |
| 86 | newLastUpdated = updated |
| 87 | } |
| 88 | } |
| 89 | } |
| 90 | |
| 91 | if !newLastUpdated.Equal(lastUpdate) || newCount != lastCount { |
| 92 | lastUpdate = newLastUpdated |
| 93 | lastCount = newCount |
| 94 | t.sendDownloadProgressEventLocked() |
| 95 | progressUpdates = t.computeProgressUpdates() |
| 96 | } |
| 97 | |
| 98 | if newCount != 0 { |
| 99 | t.timer.Reset(t.interval) |
| 100 | } |
| 101 | t.mut.Unlock() |
| 102 | |
| 103 | // Do the sending outside of the lock. |
| 104 | // If these send block, the whole process of reporting progress to others stops, but that's probably fine. |
| 105 | // It's better to stop this component from working under back-pressure than causing other components that |
| 106 | // rely on this component to be waiting for locks. |
| 107 | // |
| 108 | // This might leave remote peers in some funky state where we are unable the fact that we no longer have |
| 109 | // something, but there is not much we can do here. |
| 110 | for _, update := range progressUpdates { |
| 111 | update.send(ctx) |
| 112 | } |
| 113 | } |
| 114 | } |
| 115 | } |
| 116 | |
| 117 | func (t *ProgressEmitter) sendDownloadProgressEventLocked() { |
| 118 | output := make(map[string]map[string]*PullerProgress) |