MCPcopy Index your code
hub / github.com/containerd/containerd / ReadByteStream

Function ReadByteStream

core/transfer/streaming/reader.go:39–74  ·  view source on GitHub ↗
(ctx context.Context, stream streaming.Stream)

Source from the content-addressed store, hash-verified

37}
38
39func ReadByteStream(ctx context.Context, stream streaming.Stream) io.ReadCloser {
40 rbs := &readByteStream{
41 ctx: ctx,
42 stream: stream,
43 window: 0,
44 errCh: make(chan error),
45 updated: make(chan struct{}, 1),
46 }
47 go func() {
48 for {
49 if rbs.window >= windowSize {
50 select {
51 case <-ctx.Done():
52 return
53 case <-rbs.updated:
54 continue
55 }
56 }
57 update := &transferapi.WindowUpdate{
58 Update: windowSize,
59 }
60 anyType, err := typeurl.MarshalAny(update)
61 if err != nil {
62 rbs.errCh <- err
63 return
64 }
65 if err := stream.Send(anyType); err == nil {
66 rbs.window += windowSize
67 } else if !errors.Is(err, io.EOF) {
68 rbs.errCh <- err
69 }
70 }
71
72 }()
73 return rbs
74}
75
76func (r *readByteStream) Read(p []byte) (n int, err error) {
77 plen := len(p)

Callers 1

openOutputStreamFunction · 0.92

Calls 3

DoneMethod · 0.65
MarshalAnyMethod · 0.65
SendMethod · 0.65

Tested by

no test coverage detected

Used in the wild real call sites across dependent graphs

searching dependent graphs…