MCPcopy
hub / github.com/NVIDIA/aistore / Send

Method Send

transport/bundle/stream_bundle.go:142–204  ·  view source on GitHub ↗

when (nodes == nil) transmit via all established streams in a bundle otherwise, restrict to the specified subset (nodes)

(obj *transport.Obj, roc cos.ReadOpenCloser, nodes ...*cluster.Snode)

Source from the content-addressed store, hash-verified

140// when (nodes == nil) transmit via all established streams in a bundle
141// otherwise, restrict to the specified subset (nodes)
142func (sb *Streams) Send(obj *transport.Obj, roc cos.ReadOpenCloser, nodes ...*cluster.Snode) (err error) {
143 debug.Assert(!transport.ReservedOpcode(obj.Hdr.Opcode))
144 streams := sb.get()
145 if len(streams) == 0 {
146 err = fmt.Errorf("no streams %s => .../%s", sb.lsnode, sb.trname)
147 } else if nodes != nil && len(nodes) == 0 {
148 err = fmt.Errorf("no destinations %s => .../%s", sb.lsnode, sb.trname)
149 } else if obj.IsUnsized() && sb.extra.SizePDU == 0 {
150 err = fmt.Errorf("[%s/%s] sending unsized object supported only with PDUs", obj.Hdr.Bck, obj.Hdr.ObjName)
151 }
152
153 debug.AssertNoErr(err)
154 if obj.Callback == nil {
155 obj.Callback = sb.extra.Callback
156 }
157 if obj.IsHeaderOnly() {
158 roc = nil
159 }
160 if err != nil {
161 // compare w/ transport doCmpl()
162 _doCmpl(obj, roc, err)
163 return
164 }
165
166 if nodes == nil {
167 idx, cnt := 0, len(streams)
168 obj.SetPrc(cnt)
169 // Reader-reopening logic: since the streams in a bundle are mutually independent
170 // and asynchronous, reader.Open() (aka reopen) is skipped for the 1st replica
171 // that we put on the wire and is done for the 2nd, 3rd, etc. replicas.
172 // In other words, for the N object replicas over the N bundled streams, the
173 // original reader will get reopened (N-1) times.
174 for sid, robin := range streams {
175 if sb.lsnode.ID() == sid {
176 continue
177 }
178 if err = sb.sendOne(obj, roc, robin, idx, cnt); err != nil {
179 return
180 }
181 idx++
182 }
183 } else {
184 // first, check streams vs destinations
185 for _, di := range nodes {
186 if _, ok := streams[di.ID()]; ok {
187 continue
188 }
189 err = cmn.NewErrNotFound("destination mismatch: stream (%s) => %s", sb, di)
190 _doCmpl(obj, roc, err) // ditto
191 return
192 }
193 // second, do send. Same comment wrt reopening.
194 cnt := len(nodes)
195 obj.SetPrc(cnt)
196 for idx, di := range nodes {
197 robin := streams[di.ID()]
198 if err = sb.sendOne(obj, roc, robin, idx, cnt); err != nil {
199 return

Callers 2

testBundleFunction · 0.95
sendOneMethod · 0.95

Calls 11

getMethod · 0.95
sendOneMethod · 0.95
AssertFunction · 0.92
ReservedOpcodeFunction · 0.92
AssertNoErrFunction · 0.92
NewErrNotFoundFunction · 0.92
_doCmplFunction · 0.85
SetPrcMethod · 0.80
IDMethod · 0.65
IsUnsizedMethod · 0.45
IsHeaderOnlyMethod · 0.45

Tested by 1

testBundleFunction · 0.76