message stream //
(client Client, dstURL, dstID string)
| 179 | //////////////////// |
| 180 | |
| 181 | func NewMsgStream(client Client, dstURL, dstID string) (s *MsgStream) { |
| 182 | extra := &Extra{Config: cmn.GCO.Get()} |
| 183 | s = &MsgStream{streamBase: *newStreamBase(client, dstURL, dstID, extra)} |
| 184 | s.streamBase.streamer = s |
| 185 | |
| 186 | burst := burst(extra.Config) // num messages the caller can post without blocking |
| 187 | s.workCh = make(chan *Msg, burst) // Send Qeueue or SQ |
| 188 | |
| 189 | s.wg.Add(1) |
| 190 | go s.sendLoop(dryrun()) |
| 191 | |
| 192 | gc.ctrlCh <- ctrl{&s.streamBase, true /* collect */} |
| 193 | return |
| 194 | } |
| 195 | |
| 196 | func (s *MsgStream) Send(msg *Msg) (err error) { |
| 197 | debug.Assert(len(msg.Body) < len(s.maxheader)-int(unsafe.Sizeof(Msg{}))) |