NewQueueManager builds a new QueueManager and starts a new WAL watcher with queue manager as the WriteTo destination. The WAL watcher takes the dir parameter as the base directory for where the WAL shall be located. Note that the full path to the WAL directory will be constructed as /wal.
( metrics *queueManagerMetrics, watcherMetrics *wlog.WatcherMetrics, readerMetrics *wlog.LiveReaderMetrics, logger *slog.Logger, dir string, samplesIn *ewmaRate, cfg config.QueueConfig, mCfg config.MetadataConfig, externalLabels labels.Labels, relabelConfigs []*relabel.Config, client WriteClient, flushDeadline time.Duration, interner *pool, highestRecvTimestamp *maxTimestamp, sm ReadyScrapeManager, enableExemplarRemoteWrite bool, enableNativeHistogramRemoteWrite bool, enableTypeAndUnitLabels bool, protoMsg remoteapi.WriteMessageType, recordBuf *record.BuffersPool, )
| 469 | // for where the WAL shall be located. Note that the full path to |
| 470 | // the WAL directory will be constructed as <dir>/wal. |
| 471 | func NewQueueManager( |
| 472 | metrics *queueManagerMetrics, |
| 473 | watcherMetrics *wlog.WatcherMetrics, |
| 474 | readerMetrics *wlog.LiveReaderMetrics, |
| 475 | logger *slog.Logger, |
| 476 | dir string, |
| 477 | samplesIn *ewmaRate, |
| 478 | cfg config.QueueConfig, |
| 479 | mCfg config.MetadataConfig, |
| 480 | externalLabels labels.Labels, |
| 481 | relabelConfigs []*relabel.Config, |
| 482 | client WriteClient, |
| 483 | flushDeadline time.Duration, |
| 484 | interner *pool, |
| 485 | highestRecvTimestamp *maxTimestamp, |
| 486 | sm ReadyScrapeManager, |
| 487 | enableExemplarRemoteWrite bool, |
| 488 | enableNativeHistogramRemoteWrite bool, |
| 489 | enableTypeAndUnitLabels bool, |
| 490 | protoMsg remoteapi.WriteMessageType, |
| 491 | recordBuf *record.BuffersPool, |
| 492 | ) *QueueManager { |
| 493 | if logger == nil { |
| 494 | logger = promslog.NewNopLogger() |
| 495 | } |
| 496 | |
| 497 | // Copy externalLabels into a slice, which we need for processExternalLabels. |
| 498 | extLabelsSlice := make([]labels.Label, 0, externalLabels.Len()) |
| 499 | externalLabels.Range(func(l labels.Label) { |
| 500 | extLabelsSlice = append(extLabelsSlice, l) |
| 501 | }) |
| 502 | |
| 503 | logger = logger.With(remoteName, client.Name(), endpoint, client.Endpoint()) |
| 504 | t := &QueueManager{ |
| 505 | logger: logger, |
| 506 | flushDeadline: flushDeadline, |
| 507 | cfg: cfg, |
| 508 | mcfg: mCfg, |
| 509 | externalLabels: extLabelsSlice, |
| 510 | relabelConfigs: relabelConfigs, |
| 511 | storeClient: client, |
| 512 | sendExemplars: enableExemplarRemoteWrite, |
| 513 | sendNativeHistograms: enableNativeHistogramRemoteWrite, |
| 514 | enableTypeAndUnitLabels: enableTypeAndUnitLabels, |
| 515 | |
| 516 | seriesLabels: make(map[chunks.HeadSeriesRef]labels.Labels), |
| 517 | seriesMetadata: make(map[chunks.HeadSeriesRef]*metadata.Metadata), |
| 518 | seriesSegmentIndexes: make(map[chunks.HeadSeriesRef]int), |
| 519 | droppedSeries: make(map[chunks.HeadSeriesRef]struct{}), |
| 520 | builder: labels.NewBuilder(labels.EmptyLabels()), |
| 521 | |
| 522 | numShards: cfg.MinShards, |
| 523 | reshardChan: make(chan int), |
| 524 | quit: make(chan struct{}), |
| 525 | |
| 526 | dataIn: samplesIn, |
| 527 | dataDropped: newEWMARate(ewmaWeight, shardUpdateDuration), |
| 528 | dataOut: newEWMARate(ewmaWeight, shardUpdateDuration), |
searching dependent graphs…