MCPcopy
hub / github.com/prometheus/prometheus / NewQueueManager

Function NewQueueManager

storage/remote/queue_manager.go:471–558  ·  view source on GitHub ↗

NewQueueManager builds a new QueueManager and starts a new WAL watcher with queue manager as the WriteTo destination. The WAL watcher takes the dir parameter as the base directory for where the WAL shall be located. Note that the full path to the WAL directory will be constructed as /wal.

(
	metrics *queueManagerMetrics,
	watcherMetrics *wlog.WatcherMetrics,
	readerMetrics *wlog.LiveReaderMetrics,
	logger *slog.Logger,
	dir string,
	samplesIn *ewmaRate,
	cfg config.QueueConfig,
	mCfg config.MetadataConfig,
	externalLabels labels.Labels,
	relabelConfigs []*relabel.Config,
	client WriteClient,
	flushDeadline time.Duration,
	interner *pool,
	highestRecvTimestamp *maxTimestamp,
	sm ReadyScrapeManager,
	enableExemplarRemoteWrite bool,
	enableNativeHistogramRemoteWrite bool,
	enableTypeAndUnitLabels bool,
	protoMsg remoteapi.WriteMessageType,
	recordBuf *record.BuffersPool,
)

Source from the content-addressed store, hash-verified

469// for where the WAL shall be located. Note that the full path to
470// the WAL directory will be constructed as <dir>/wal.
471func NewQueueManager(
472 metrics *queueManagerMetrics,
473 watcherMetrics *wlog.WatcherMetrics,
474 readerMetrics *wlog.LiveReaderMetrics,
475 logger *slog.Logger,
476 dir string,
477 samplesIn *ewmaRate,
478 cfg config.QueueConfig,
479 mCfg config.MetadataConfig,
480 externalLabels labels.Labels,
481 relabelConfigs []*relabel.Config,
482 client WriteClient,
483 flushDeadline time.Duration,
484 interner *pool,
485 highestRecvTimestamp *maxTimestamp,
486 sm ReadyScrapeManager,
487 enableExemplarRemoteWrite bool,
488 enableNativeHistogramRemoteWrite bool,
489 enableTypeAndUnitLabels bool,
490 protoMsg remoteapi.WriteMessageType,
491 recordBuf *record.BuffersPool,
492) *QueueManager {
493 if logger == nil {
494 logger = promslog.NewNopLogger()
495 }
496
497 // Copy externalLabels into a slice, which we need for processExternalLabels.
498 extLabelsSlice := make([]labels.Label, 0, externalLabels.Len())
499 externalLabels.Range(func(l labels.Label) {
500 extLabelsSlice = append(extLabelsSlice, l)
501 })
502
503 logger = logger.With(remoteName, client.Name(), endpoint, client.Endpoint())
504 t := &QueueManager{
505 logger: logger,
506 flushDeadline: flushDeadline,
507 cfg: cfg,
508 mcfg: mCfg,
509 externalLabels: extLabelsSlice,
510 relabelConfigs: relabelConfigs,
511 storeClient: client,
512 sendExemplars: enableExemplarRemoteWrite,
513 sendNativeHistograms: enableNativeHistogramRemoteWrite,
514 enableTypeAndUnitLabels: enableTypeAndUnitLabels,
515
516 seriesLabels: make(map[chunks.HeadSeriesRef]labels.Labels),
517 seriesMetadata: make(map[chunks.HeadSeriesRef]*metadata.Metadata),
518 seriesSegmentIndexes: make(map[chunks.HeadSeriesRef]int),
519 droppedSeries: make(map[chunks.HeadSeriesRef]struct{}),
520 builder: labels.NewBuilder(labels.EmptyLabels()),
521
522 numShards: cfg.MinShards,
523 reshardChan: make(chan int),
524 quit: make(chan struct{}),
525
526 dataIn: samplesIn,
527 dataDropped: newEWMARate(ewmaWeight, shardUpdateDuration),
528 dataOut: newEWMARate(ewmaWeight, shardUpdateDuration),

Callers 4

newTestQueueManagerFunction · 0.85
BenchmarkStoreSeriesFunction · 0.85
ApplyConfigMethod · 0.85

Calls 10

newShardsMethod · 0.95
NewBuilderFunction · 0.92
EmptyLabelsFunction · 0.92
NewWatcherFunction · 0.92
newEWMARateFunction · 0.85
NewMetadataWatcherFunction · 0.85
LenMethod · 0.65
RangeMethod · 0.65
NameMethod · 0.65
EndpointMethod · 0.65

Tested by 3

newTestQueueManagerFunction · 0.68
BenchmarkStoreSeriesFunction · 0.68

Used in the wild real call sites across dependent graphs

searching dependent graphs…