start the shards; must be called before any call to enqueue.
(n int)
| 1282 | |
| 1283 | // start the shards; must be called before any call to enqueue. |
| 1284 | func (s *shards) start(n int) { |
| 1285 | s.mtx.Lock() |
| 1286 | defer s.mtx.Unlock() |
| 1287 | |
| 1288 | s.qm.metrics.pendingSamples.Set(0) |
| 1289 | s.qm.metrics.numShards.Set(float64(n)) |
| 1290 | |
| 1291 | newQueues := make([]*queue, n) |
| 1292 | for i := range n { |
| 1293 | newQueues[i] = newQueue(s.qm.cfg.MaxSamplesPerSend, s.qm.cfg.Capacity) |
| 1294 | } |
| 1295 | |
| 1296 | s.queues = newQueues |
| 1297 | |
| 1298 | var hardShutdownCtx context.Context |
| 1299 | hardShutdownCtx, s.hardShutdown = context.WithCancel(context.Background()) |
| 1300 | s.softShutdown = make(chan struct{}) |
| 1301 | s.running.Store(int32(n)) |
| 1302 | s.done = make(chan struct{}) |
| 1303 | s.enqueuedSamples.Store(0) |
| 1304 | s.enqueuedExemplars.Store(0) |
| 1305 | s.enqueuedHistograms.Store(0) |
| 1306 | s.samplesDroppedOnHardShutdown.Store(0) |
| 1307 | s.exemplarsDroppedOnHardShutdown.Store(0) |
| 1308 | s.histogramsDroppedOnHardShutdown.Store(0) |
| 1309 | s.metadataDroppedOnHardShutdown.Store(0) |
| 1310 | for i := range n { |
| 1311 | go s.runShard(hardShutdownCtx, i, newQueues[i]) |
| 1312 | } |
| 1313 | } |
| 1314 | |
| 1315 | // stop the shards; subsequent call to enqueue will return false. |
| 1316 | func (s *shards) stop() { |