()
| 1229 | } |
| 1230 | |
| 1231 | func (t *QueueManager) reshardLoop() { |
| 1232 | defer t.wg.Done() |
| 1233 | |
| 1234 | for { |
| 1235 | select { |
| 1236 | case numShards := <-t.reshardChan: |
| 1237 | // We start the newShards after we have stopped (the therefore completely |
| 1238 | // flushed) the oldShards, to guarantee we only every deliver samples in |
| 1239 | // order. |
| 1240 | t.shards.stop() |
| 1241 | t.shards.start(numShards) |
| 1242 | case <-t.quit: |
| 1243 | return |
| 1244 | } |
| 1245 | } |
| 1246 | } |
| 1247 | |
| 1248 | func (t *QueueManager) newShards() *shards { |
| 1249 | s := &shards{ |