(wr WriteRequest)
| 266 | } |
| 267 | |
| 268 | func (dms *DiskMetricStore) processWriteRequest(wr WriteRequest) { |
| 269 | dms.lock.Lock() |
| 270 | defer dms.lock.Unlock() |
| 271 | |
| 272 | key := groupingKeyFor(wr.Labels) |
| 273 | |
| 274 | if wr.MetricFamilies == nil { |
| 275 | // No MetricFamilies means delete request. Delete the whole |
| 276 | // metric group, and we are done here. |
| 277 | delete(dms.metricGroups, key) |
| 278 | return |
| 279 | } |
| 280 | // Otherwise, it's an update. |
| 281 | group, ok := dms.metricGroups[key] |
| 282 | if !ok { |
| 283 | group = MetricGroup{ |
| 284 | Labels: wr.Labels, |
| 285 | Metrics: NameToTimestampedMetricFamilyMap{}, |
| 286 | } |
| 287 | dms.metricGroups[key] = group |
| 288 | } else if wr.Replace { |
| 289 | // For replace, we have to delete all metric families in the |
| 290 | // group except pre-existing push timestamps. |
| 291 | for name := range group.Metrics { |
| 292 | if name != pushMetricName && name != pushFailedMetricName { |
| 293 | delete(group.Metrics, name) |
| 294 | } |
| 295 | } |
| 296 | } |
| 297 | wr.MetricFamilies[pushMetricName] = newPushTimestampGauge(wr.Labels, wr.Timestamp) |
| 298 | // Only add a zero push-failed metric if none is there yet, so that a |
| 299 | // previously added fail timestamp is retained. |
| 300 | if _, ok := group.Metrics[pushFailedMetricName]; !ok { |
| 301 | wr.MetricFamilies[pushFailedMetricName] = newPushFailedTimestampGauge(wr.Labels, time.Time{}) |
| 302 | } |
| 303 | for name, mf := range wr.MetricFamilies { |
| 304 | group.Metrics[name] = TimestampedMetricFamily{ |
| 305 | Timestamp: wr.Timestamp, |
| 306 | GobbableMetricFamily: (*GobbableMetricFamily)(mf), |
| 307 | } |
| 308 | } |
| 309 | } |
| 310 | |
| 311 | func (dms *DiskMetricStore) setPushFailedTimestamp(wr WriteRequest) { |
| 312 | dms.lock.Lock() |
no test coverage detected