(reg prometheus.Registerer, size int, writeChunk writeChunkF)
| 80 | type writeChunkF func(HeadSeriesRef, int64, int64, chunkenc.Chunk, ChunkDiskMapperRef, bool, bool) error |
| 81 | |
| 82 | func newChunkWriteQueue(reg prometheus.Registerer, size int, writeChunk writeChunkF) *chunkWriteQueue { |
| 83 | counters := prometheus.NewCounterVec( |
| 84 | prometheus.CounterOpts{ |
| 85 | Name: "prometheus_tsdb_chunk_write_queue_operations_total", |
| 86 | Help: "Number of operations on the chunk_write_queue.", |
| 87 | }, |
| 88 | []string{"operation"}, |
| 89 | ) |
| 90 | |
| 91 | segmentSize := min(size, maxChunkQueueSegmentSize) |
| 92 | |
| 93 | q := &chunkWriteQueue{ |
| 94 | jobs: newWriteJobQueue(size, segmentSize), |
| 95 | chunkRefMap: make(map[ChunkDiskMapperRef]chunkenc.Chunk), |
| 96 | chunkRefMapLastShrink: time.Now(), |
| 97 | writeChunk: writeChunk, |
| 98 | |
| 99 | adds: counters.WithLabelValues("add"), |
| 100 | gets: counters.WithLabelValues("get"), |
| 101 | completed: counters.WithLabelValues("complete"), |
| 102 | shrink: counters.WithLabelValues("shrink"), |
| 103 | } |
| 104 | |
| 105 | if reg != nil { |
| 106 | reg.MustRegister(counters) |
| 107 | } |
| 108 | |
| 109 | q.start() |
| 110 | return q |
| 111 | } |
| 112 | |
| 113 | func (c *chunkWriteQueue) start() { |
| 114 | c.workerWg.Go(func() { |
searching dependent graphs…