| 434 | } |
| 435 | |
| 436 | func (l *ioCopyLimiter) limitedCopy(ctx context.Context, dst io.Writer, src io.Reader) error { |
| 437 | var ( |
| 438 | bufRef = bufPool.Get().(*[]byte) |
| 439 | buf = *bufRef |
| 440 | timer = time.NewTimer(0) |
| 441 | written int64 |
| 442 | ) |
| 443 | |
| 444 | defer bufPool.Put(bufRef) |
| 445 | |
| 446 | stopTimer := func(t *time.Timer, needRecv bool) { |
| 447 | if !t.Stop() && needRecv { |
| 448 | <-t.C |
| 449 | } |
| 450 | } |
| 451 | |
| 452 | waitForRetry := func(t *time.Timer, delay time.Duration) error { |
| 453 | needRecv := true |
| 454 | |
| 455 | t.Reset(delay) |
| 456 | select { |
| 457 | case <-t.C: |
| 458 | needRecv = false |
| 459 | case <-ctx.Done(): |
| 460 | return ctx.Err() |
| 461 | } |
| 462 | stopTimer(t, needRecv) |
| 463 | return nil |
| 464 | } |
| 465 | |
| 466 | stopTimer(timer, true) |
| 467 | defer timer.Stop() |
| 468 | for { |
| 469 | if written > int64(l.limitedBytes) { |
| 470 | l.hitCircuitBreaker = true |
| 471 | |
| 472 | log.G(ctx).Warnf("after %v bytes transferred, enable breaker and retransfer after %v", written, l.retryAfter) |
| 473 | if wer := waitForRetry(timer, l.retryAfter); wer != nil { |
| 474 | return wer |
| 475 | } |
| 476 | |
| 477 | written = 0 |
| 478 | l.hitCircuitBreaker = false |
| 479 | } |
| 480 | |
| 481 | nr, er := io.ReadAtLeast(src, buf, len(buf)) |
| 482 | if nr > 0 { |
| 483 | nw, ew := dst.Write(buf[0:nr]) |
| 484 | if nw > 0 { |
| 485 | written += int64(nw) |
| 486 | } |
| 487 | if ew != nil { |
| 488 | return ew |
| 489 | } |
| 490 | if nr != nw { |
| 491 | return io.ErrShortWrite |
| 492 | } |
| 493 | } |