object stream //
(client Client, dstURL, dstID string, extra *Extra)
| 100 | /////////////////// |
| 101 | |
| 102 | func NewObjStream(client Client, dstURL, dstID string, extra *Extra) (s *Stream) { |
| 103 | if extra == nil { |
| 104 | extra = &Extra{Config: cmn.GCO.Get()} |
| 105 | } else if extra.Config == nil { |
| 106 | extra.Config = cmn.GCO.Get() |
| 107 | } |
| 108 | s = &Stream{streamBase: *newStreamBase(client, dstURL, dstID, extra)} |
| 109 | s.streamBase.streamer = s |
| 110 | s.callback = extra.Callback |
| 111 | if extra.Compressed() { |
| 112 | s.initCompression(extra) |
| 113 | } |
| 114 | debug.Assert(s.usePDU() == extra.UsePDU()) |
| 115 | |
| 116 | burst := burst(extra.Config) // num objects the caller can post without blocking |
| 117 | s.workCh = make(chan *Obj, burst) // Send Qeueue (SQ) |
| 118 | s.cmplCh = make(chan cmpl, burst) // Send Completion Queue (SCQ) |
| 119 | |
| 120 | s.wg.Add(2) |
| 121 | go s.sendLoop(dryrun()) // handle SQ |
| 122 | go s.cmplLoop() // handle SCQ |
| 123 | |
| 124 | gc.ctrlCh <- ctrl{&s.streamBase, true /* collect */} |
| 125 | return |
| 126 | } |
| 127 | |
| 128 | // Asynchronously send an object (transport.Obj) defined by its header and its reader. |
| 129 | // |