NewTransferQueue builds a TransferQueue, direction and underlying mechanism determined by adapter
(dir Direction, manifest Manifest, remote string, options ...Option)
| 300 | |
| 301 | // NewTransferQueue builds a TransferQueue, direction and underlying mechanism determined by adapter |
| 302 | func NewTransferQueue(dir Direction, manifest Manifest, remote string, options ...Option) *TransferQueue { |
| 303 | q := &TransferQueue{ |
| 304 | direction: dir, |
| 305 | remote: remote, |
| 306 | errorc: make(chan error), |
| 307 | transfers: make(map[string]*objects), |
| 308 | trMutex: &sync.Mutex{}, |
| 309 | manifest: manifest, |
| 310 | rc: newRetryCounter(), |
| 311 | wait: newAbortableWaitGroup(), |
| 312 | } |
| 313 | |
| 314 | for _, opt := range options { |
| 315 | opt(q) |
| 316 | } |
| 317 | |
| 318 | if q.batchSize <= 0 { |
| 319 | q.batchSize = defaultBatchSize |
| 320 | } |
| 321 | if q.bufferDepth <= 0 { |
| 322 | q.bufferDepth = q.batchSize |
| 323 | } |
| 324 | if q.meter != nil { |
| 325 | q.meter.Direction = q.direction |
| 326 | } |
| 327 | |
| 328 | q.incoming = make(chan *objectTuple, q.bufferDepth) |
| 329 | q.collectorWait.Add(1) |
| 330 | q.errorwait.Add(1) |
| 331 | q.run() |
| 332 | |
| 333 | return q |
| 334 | } |
| 335 | |
| 336 | // Ensure we have a concrete manifest and that certain delayed variables are set |
| 337 | // properly. |