downloadBlockWithRetry fetches a block with exponential backoff retry.
(offset int64)
| 243 | |
| 244 | // downloadBlockWithRetry fetches a block with exponential backoff retry. |
| 245 | func (p *StreamPipe) downloadBlockWithRetry(offset int64) ([]byte, error) { |
| 246 | var lastErr error |
| 247 | |
| 248 | // TODO: make configurable later |
| 249 | backoff := 100 * time.Millisecond // initial backoff = 100ms |
| 250 | const maxBackoff = 15 * time.Second // max backoff = 15s |
| 251 | |
| 252 | for attempt := 0; attempt < config.ValueOf.StreamMaxRetries; attempt++ { |
| 253 | // check context before each attempt |
| 254 | if p.ctx.Err() != nil { |
| 255 | return nil, p.ctx.Err() |
| 256 | } |
| 257 | |
| 258 | ctx, cancel := context.WithTimeout(p.ctx, time.Duration(config.ValueOf.StreamTimeoutSec)*time.Second) |
| 259 | data, err := utils.TimeFuncWithResult(p.log, "downloadBlock", func() ([]byte, error) { |
| 260 | return p.downloadBlock(ctx, offset) |
| 261 | }) |
| 262 | cancel() |
| 263 | |
| 264 | if err == nil { |
| 265 | return data, nil |
| 266 | } |
| 267 | |
| 268 | lastErr = err |
| 269 | |
| 270 | // don't retry on context cancellation |
| 271 | if p.ctx.Err() != nil { |
| 272 | return nil, p.ctx.Err() |
| 273 | } |
| 274 | |
| 275 | // exponential backoff |
| 276 | select { |
| 277 | case <-time.After(backoff): |
| 278 | backoff *= 2 |
| 279 | // making sure backoff doesn't grow indefinitely in case of persistent failures |
| 280 | if backoff > maxBackoff { |
| 281 | backoff = maxBackoff |
| 282 | } |
| 283 | case <-p.ctx.Done(): |
| 284 | return nil, p.ctx.Err() |
| 285 | } |
| 286 | } |
| 287 | |
| 288 | return nil, fmt.Errorf("%w: %v", ErrMaxRetriesExceeded, lastErr) |
| 289 | } |
| 290 | |
| 291 | // downloadBlock fetches a single block from Telegram. |
| 292 | func (p *StreamPipe) downloadBlock(ctx context.Context, offset int64) ([]byte, error) { |
no test coverage detected