(p []byte)
| 81 | } |
| 82 | |
| 83 | func (wbs *writeByteStream) Write(p []byte) (n int, err error) { |
| 84 | for len(p) > 0 { |
| 85 | remaining := wbs.remaining.Load() |
| 86 | if remaining == 0 { |
| 87 | // Don't wait for window update since there are remaining |
| 88 | select { |
| 89 | case <-wbs.ctx.Done(): |
| 90 | // TODO: Send error message on stream before close to allow remote side to return error |
| 91 | err = io.ErrShortWrite |
| 92 | return |
| 93 | case <-wbs.updated: |
| 94 | continue |
| 95 | } |
| 96 | } |
| 97 | var max int32 = maxRead |
| 98 | if max > int32(len(p)) { |
| 99 | max = int32(len(p)) |
| 100 | } |
| 101 | if max > remaining { |
| 102 | max = remaining |
| 103 | } |
| 104 | // TODO: continue |
| 105 | // remaining = remaining - int32(n) |
| 106 | |
| 107 | data := &transferapi.Data{ |
| 108 | Data: p[:max], |
| 109 | } |
| 110 | var anyType typeurl.Any |
| 111 | anyType, err = typeurl.MarshalAny(data) |
| 112 | if err != nil { |
| 113 | log.G(wbs.ctx).WithError(err).Errorf("failed to marshal data for send") |
| 114 | // TODO: Send error message on stream before close to allow remote side to return error |
| 115 | return |
| 116 | } |
| 117 | if err = wbs.stream.Send(anyType); err != nil { |
| 118 | log.G(wbs.ctx).WithError(err).Errorf("send failed") |
| 119 | return |
| 120 | } |
| 121 | n += int(max) |
| 122 | p = p[max:] |
| 123 | wbs.remaining.Add(-1 * max) |
| 124 | } |
| 125 | return |
| 126 | } |
| 127 | |
| 128 | func (wbs *writeByteStream) Close() error { |
| 129 | return wbs.stream.Close() |
nothing calls this directly
no test coverage detected