(persistenceInterval time.Duration)
| 202 | } |
| 203 | |
| 204 | func (dms *DiskMetricStore) loop(persistenceInterval time.Duration) { |
| 205 | lastPersist := time.Now() |
| 206 | persistScheduled := false |
| 207 | lastWrite := time.Time{} |
| 208 | persistDone := make(chan time.Time) |
| 209 | var persistTimer *time.Timer |
| 210 | |
| 211 | checkPersist := func() { |
| 212 | if dms.persistenceFile != "" && !persistScheduled && lastWrite.After(lastPersist) { |
| 213 | persistTimer = time.AfterFunc( |
| 214 | persistenceInterval-lastWrite.Sub(lastPersist), |
| 215 | func() { |
| 216 | persistStarted := time.Now() |
| 217 | if err := dms.persist(); err != nil { |
| 218 | dms.logger.Error("error persisting metrics", "err", err) |
| 219 | } else { |
| 220 | dms.logger.Info("metrics persisted", "file", dms.persistenceFile) |
| 221 | } |
| 222 | persistDone <- persistStarted |
| 223 | }, |
| 224 | ) |
| 225 | persistScheduled = true |
| 226 | } |
| 227 | } |
| 228 | |
| 229 | for { |
| 230 | select { |
| 231 | case wr := <-dms.writeQueue: |
| 232 | lastWrite = time.Now() |
| 233 | if dms.checkWriteRequest(wr) { |
| 234 | dms.processWriteRequest(wr) |
| 235 | } else { |
| 236 | dms.setPushFailedTimestamp(wr) |
| 237 | } |
| 238 | if wr.Done != nil { |
| 239 | close(wr.Done) |
| 240 | } |
| 241 | checkPersist() |
| 242 | case lastPersist = <-persistDone: |
| 243 | persistScheduled = false |
| 244 | checkPersist() // In case something has been written in the meantime. |
| 245 | case <-dms.drain: |
| 246 | // Prevent a scheduled persist from firing later. |
| 247 | if persistTimer != nil { |
| 248 | persistTimer.Stop() |
| 249 | } |
| 250 | // Now draining... |
| 251 | for { |
| 252 | select { |
| 253 | case wr := <-dms.writeQueue: |
| 254 | if dms.checkWriteRequest(wr) { |
| 255 | dms.processWriteRequest(wr) |
| 256 | } else { |
| 257 | dms.setPushFailedTimestamp(wr) |
| 258 | } |
| 259 | default: |
| 260 | dms.done <- dms.persist() |
| 261 | return |
no test coverage detected