MCPcopy
hub / github.com/prometheus/pushgateway / loop

Method loop

storage/diskmetricstore.go:204–266  ·  view source on GitHub ↗
(persistenceInterval time.Duration)

Source from the content-addressed store, hash-verified

202}
203
204func (dms *DiskMetricStore) loop(persistenceInterval time.Duration) {
205 lastPersist := time.Now()
206 persistScheduled := false
207 lastWrite := time.Time{}
208 persistDone := make(chan time.Time)
209 var persistTimer *time.Timer
210
211 checkPersist := func() {
212 if dms.persistenceFile != "" && !persistScheduled && lastWrite.After(lastPersist) {
213 persistTimer = time.AfterFunc(
214 persistenceInterval-lastWrite.Sub(lastPersist),
215 func() {
216 persistStarted := time.Now()
217 if err := dms.persist(); err != nil {
218 dms.logger.Error("error persisting metrics", "err", err)
219 } else {
220 dms.logger.Info("metrics persisted", "file", dms.persistenceFile)
221 }
222 persistDone <- persistStarted
223 },
224 )
225 persistScheduled = true
226 }
227 }
228
229 for {
230 select {
231 case wr := <-dms.writeQueue:
232 lastWrite = time.Now()
233 if dms.checkWriteRequest(wr) {
234 dms.processWriteRequest(wr)
235 } else {
236 dms.setPushFailedTimestamp(wr)
237 }
238 if wr.Done != nil {
239 close(wr.Done)
240 }
241 checkPersist()
242 case lastPersist = <-persistDone:
243 persistScheduled = false
244 checkPersist() // In case something has been written in the meantime.
245 case <-dms.drain:
246 // Prevent a scheduled persist from firing later.
247 if persistTimer != nil {
248 persistTimer.Stop()
249 }
250 // Now draining...
251 for {
252 select {
253 case wr := <-dms.writeQueue:
254 if dms.checkWriteRequest(wr) {
255 dms.processWriteRequest(wr)
256 } else {
257 dms.setPushFailedTimestamp(wr)
258 }
259 default:
260 dms.done <- dms.persist()
261 return

Callers 1

NewDiskMetricStoreFunction · 0.95

Calls 5

persistMethod · 0.95
checkWriteRequestMethod · 0.95
processWriteRequestMethod · 0.95
ErrorMethod · 0.80

Tested by

no test coverage detected