()
| 144 | } |
| 145 | |
| 146 | func (s *LocalDownloadService) broadcastLoop() { |
| 147 | for msg := range s.InputCh { |
| 148 | s.listenerMu.Lock() |
| 149 | listenersCopy := make([]chan any, len(s.listeners)) |
| 150 | copy(listenersCopy, s.listeners) |
| 151 | s.listenerMu.Unlock() |
| 152 | |
| 153 | for _, ch := range listenersCopy { |
| 154 | // Check message type |
| 155 | isProgress := false |
| 156 | switch msg.(type) { |
| 157 | case events.ProgressMsg: |
| 158 | isProgress = true |
| 159 | case events.BatchProgressMsg: |
| 160 | isProgress = true |
| 161 | } |
| 162 | |
| 163 | func() { |
| 164 | defer func() { _ = recover() }() |
| 165 | if isProgress { |
| 166 | // Non-blocking send for progress updates |
| 167 | select { |
| 168 | case ch <- msg: |
| 169 | default: |
| 170 | // Drop progress message if channel is full |
| 171 | } |
| 172 | } else { |
| 173 | // Blocking send with timeout for critical state changes |
| 174 | // We don't want to drop these, but we also don't want to block forever if a client is dead |
| 175 | select { |
| 176 | case ch <- msg: |
| 177 | case <-time.After(1 * time.Second): |
| 178 | utils.Debug("Dropped critical event due to slow client") |
| 179 | } |
| 180 | } |
| 181 | }() |
| 182 | } |
| 183 | } |
| 184 | // Close all listeners when input closes |
| 185 | s.listenerMu.Lock() |
| 186 | for _, ch := range s.listeners { |
| 187 | close(ch) |
| 188 | } |
| 189 | s.listeners = nil |
| 190 | s.listenerMu.Unlock() |
| 191 | |
| 192 | if s.reportTicker != nil { |
| 193 | s.reportTicker.Stop() |
| 194 | } |
| 195 | } |
| 196 | |
| 197 | func (s *LocalDownloadService) reportProgressLoop() { |
| 198 | lastSpeeds := make(map[string]float64) |
no test coverage detected