when (nodes == nil) transmit via all established streams in a bundle otherwise, restrict to the specified subset (nodes)
(obj *transport.Obj, roc cos.ReadOpenCloser, nodes ...*cluster.Snode)
| 140 | // when (nodes == nil) transmit via all established streams in a bundle |
| 141 | // otherwise, restrict to the specified subset (nodes) |
| 142 | func (sb *Streams) Send(obj *transport.Obj, roc cos.ReadOpenCloser, nodes ...*cluster.Snode) (err error) { |
| 143 | debug.Assert(!transport.ReservedOpcode(obj.Hdr.Opcode)) |
| 144 | streams := sb.get() |
| 145 | if len(streams) == 0 { |
| 146 | err = fmt.Errorf("no streams %s => .../%s", sb.lsnode, sb.trname) |
| 147 | } else if nodes != nil && len(nodes) == 0 { |
| 148 | err = fmt.Errorf("no destinations %s => .../%s", sb.lsnode, sb.trname) |
| 149 | } else if obj.IsUnsized() && sb.extra.SizePDU == 0 { |
| 150 | err = fmt.Errorf("[%s/%s] sending unsized object supported only with PDUs", obj.Hdr.Bck, obj.Hdr.ObjName) |
| 151 | } |
| 152 | |
| 153 | debug.AssertNoErr(err) |
| 154 | if obj.Callback == nil { |
| 155 | obj.Callback = sb.extra.Callback |
| 156 | } |
| 157 | if obj.IsHeaderOnly() { |
| 158 | roc = nil |
| 159 | } |
| 160 | if err != nil { |
| 161 | // compare w/ transport doCmpl() |
| 162 | _doCmpl(obj, roc, err) |
| 163 | return |
| 164 | } |
| 165 | |
| 166 | if nodes == nil { |
| 167 | idx, cnt := 0, len(streams) |
| 168 | obj.SetPrc(cnt) |
| 169 | // Reader-reopening logic: since the streams in a bundle are mutually independent |
| 170 | // and asynchronous, reader.Open() (aka reopen) is skipped for the 1st replica |
| 171 | // that we put on the wire and is done for the 2nd, 3rd, etc. replicas. |
| 172 | // In other words, for the N object replicas over the N bundled streams, the |
| 173 | // original reader will get reopened (N-1) times. |
| 174 | for sid, robin := range streams { |
| 175 | if sb.lsnode.ID() == sid { |
| 176 | continue |
| 177 | } |
| 178 | if err = sb.sendOne(obj, roc, robin, idx, cnt); err != nil { |
| 179 | return |
| 180 | } |
| 181 | idx++ |
| 182 | } |
| 183 | } else { |
| 184 | // first, check streams vs destinations |
| 185 | for _, di := range nodes { |
| 186 | if _, ok := streams[di.ID()]; ok { |
| 187 | continue |
| 188 | } |
| 189 | err = cmn.NewErrNotFound("destination mismatch: stream (%s) => %s", sb, di) |
| 190 | _doCmpl(obj, roc, err) // ditto |
| 191 | return |
| 192 | } |
| 193 | // second, do send. Same comment wrt reopening. |
| 194 | cnt := len(nodes) |
| 195 | obj.SetPrc(cnt) |
| 196 | for idx, di := range nodes { |
| 197 | robin := streams[di.ID()] |
| 198 | if err = sb.sendOne(obj, roc, robin, idx, cnt); err != nil { |
| 199 | return |