(ctx context.Context)
| 1202 | } |
| 1203 | |
| 1204 | func (db *DB) run(ctx context.Context) { |
| 1205 | defer close(db.donec) |
| 1206 | |
| 1207 | backoff := time.Duration(0) |
| 1208 | |
| 1209 | for { |
| 1210 | select { |
| 1211 | case <-db.stopc: |
| 1212 | return |
| 1213 | case <-time.After(backoff): |
| 1214 | } |
| 1215 | |
| 1216 | select { |
| 1217 | case <-time.After(db.opts.BlockReloadInterval): |
| 1218 | db.cmtx.Lock() |
| 1219 | if err := db.reloadBlocks(); err != nil { |
| 1220 | db.logger.Error("reloadBlocks", "err", err) |
| 1221 | } |
| 1222 | db.cmtx.Unlock() |
| 1223 | |
| 1224 | select { |
| 1225 | case db.compactc <- struct{}{}: |
| 1226 | default: |
| 1227 | } |
| 1228 | // We attempt mmapping of head chunks regularly. |
| 1229 | db.head.mmapHeadChunks() |
| 1230 | |
| 1231 | numStaleSeries, numSeries := db.Head().NumStaleSeries(), db.Head().NumSeries() |
| 1232 | if db.autoCompact && numSeries > 0 && db.opts.staleSeriesCompactionThreshold.Load() > 0 { |
| 1233 | staleSeriesRatio := float64(numStaleSeries) / float64(numSeries) |
| 1234 | if staleSeriesRatio >= db.opts.staleSeriesCompactionThreshold.Load() { |
| 1235 | nextCompactionIsSoon := false |
| 1236 | if !db.lastHeadCompactionTime.IsZero() { |
| 1237 | compactionInterval := time.Duration(db.head.chunkRange.Load()) * time.Millisecond |
| 1238 | nextEstimatedCompactionTime := db.lastHeadCompactionTime.Add(compactionInterval) |
| 1239 | if time.Now().Add(10 * time.Minute).After(nextEstimatedCompactionTime) { |
| 1240 | // Next compaction is starting within next 10 mins. |
| 1241 | nextCompactionIsSoon = true |
| 1242 | } |
| 1243 | } |
| 1244 | |
| 1245 | if !nextCompactionIsSoon { |
| 1246 | if err := db.CompactStaleHead(); err != nil { |
| 1247 | db.logger.Error("immediate stale series compaction failed", "err", err) |
| 1248 | } |
| 1249 | } |
| 1250 | } |
| 1251 | } |
| 1252 | |
| 1253 | case <-db.compactc: |
| 1254 | db.metrics.compactionsTriggered.Inc() |
| 1255 | |
| 1256 | db.autoCompactMtx.Lock() |
| 1257 | if db.autoCompact { |
| 1258 | if err := db.Compact(ctx); err != nil { |
| 1259 | db.logger.Error("compaction failed", "err", err) |
| 1260 | backoff = exponential(backoff, 1*time.Second, 1*time.Minute) |
| 1261 | } else { |
no test coverage detected