MCPcopy
hub / github.com/containerd/containerd / SendStream

Function SendStream

core/transfer/streaming/stream.go:45–141  ·  view source on GitHub ↗
(ctx context.Context, r io.Reader, stream streaming.Stream)

Source from the content-addressed store, hash-verified

43}
44
45func SendStream(ctx context.Context, r io.Reader, stream streaming.Stream) {
46 window := make(chan int32)
47 go func() {
48 defer close(window)
49 for {
50 select {
51 case <-ctx.Done():
52 return
53 default:
54 }
55
56 anyType, err := stream.Recv()
57 if err != nil {
58 if !errors.Is(err, io.EOF) && !errors.Is(err, context.Canceled) {
59 log.G(ctx).WithError(err).Error("send stream ended without EOF")
60 }
61 return
62 }
63 i, err := typeurl.UnmarshalAny(anyType)
64 if err != nil {
65 log.G(ctx).WithError(err).Error("failed to unmarshal stream object")
66 continue
67 }
68 switch v := i.(type) {
69 case *transferapi.WindowUpdate:
70 select {
71 case <-ctx.Done():
72 return
73 case window <- v.Update:
74 }
75 default:
76 log.G(ctx).Errorf("unexpected stream object of type %T", i)
77 }
78 }
79 }()
80 go func() {
81 defer stream.Close()
82
83 buf := bufPool.Get().(*[]byte)
84 defer bufPool.Put(buf)
85
86 var remaining int32
87
88 for {
89 if remaining > 0 {
90 // Don't wait for window update since there are remaining
91 select {
92 case <-ctx.Done():
93 // TODO: Send error message on stream before close to allow remote side to return error
94 return
95 case update := <-window:
96 remaining += update
97 default:
98 }
99 } else {
100 // Block until window updated
101 select {
102 case <-ctx.Done():

Callers 3

runSendAndReceiveFuzzFunction · 0.85
chainStreamsFunction · 0.85

Calls 10

PutMethod · 0.80
DoneMethod · 0.65
RecvMethod · 0.65
UnmarshalAnyMethod · 0.65
CloseMethod · 0.65
GetMethod · 0.65
ReadMethod · 0.65
MarshalAnyMethod · 0.65
SendMethod · 0.65
ErrorMethod · 0.45

Tested by 3

runSendAndReceiveFuzzFunction · 0.68
chainStreamsFunction · 0.68

Used in the wild real call sites across dependent graphs

searching dependent graphs…