MCPcopy Index your code
hub / github.com/containerd/containerd / limitedCopy

Method limitedCopy

integration/image_pull_timeout_test.go:436–502  ·  view source on GitHub ↗
(ctx context.Context, dst io.Writer, src io.Reader)

Source from the content-addressed store, hash-verified

434}
435
436func (l *ioCopyLimiter) limitedCopy(ctx context.Context, dst io.Writer, src io.Reader) error {
437 var (
438 bufRef = bufPool.Get().(*[]byte)
439 buf = *bufRef
440 timer = time.NewTimer(0)
441 written int64
442 )
443
444 defer bufPool.Put(bufRef)
445
446 stopTimer := func(t *time.Timer, needRecv bool) {
447 if !t.Stop() && needRecv {
448 <-t.C
449 }
450 }
451
452 waitForRetry := func(t *time.Timer, delay time.Duration) error {
453 needRecv := true
454
455 t.Reset(delay)
456 select {
457 case <-t.C:
458 needRecv = false
459 case <-ctx.Done():
460 return ctx.Err()
461 }
462 stopTimer(t, needRecv)
463 return nil
464 }
465
466 stopTimer(timer, true)
467 defer timer.Stop()
468 for {
469 if written > int64(l.limitedBytes) {
470 l.hitCircuitBreaker = true
471
472 log.G(ctx).Warnf("after %v bytes transferred, enable breaker and retransfer after %v", written, l.retryAfter)
473 if wer := waitForRetry(timer, l.retryAfter); wer != nil {
474 return wer
475 }
476
477 written = 0
478 l.hitCircuitBreaker = false
479 }
480
481 nr, er := io.ReadAtLeast(src, buf, len(buf))
482 if nr > 0 {
483 nw, ew := dst.Write(buf[0:nr])
484 if nw > 0 {
485 written += int64(nw)
486 }
487 if ew != nil {
488 return ew
489 }
490 if nr != nw {
491 return io.ErrShortWrite
492 }
493 }

Callers 1

ServeHTTPMethod · 0.80

Calls 7

PutMethod · 0.80
GetMethod · 0.65
StopMethod · 0.65
ResetMethod · 0.65
DoneMethod · 0.65
ErrMethod · 0.65
WriteMethod · 0.65

Tested by

no test coverage detected