MCPcopy
hub / github.com/microsoft/retina / run

Method run

pkg/module/metrics/metrics_module.go:269–342  ·  view source on GitHub ↗
(newCtx context.Context)

Source from the content-addressed store, hash-verified

267}
268
269func (m *Module) run(newCtx context.Context) {
270 if m.isRunning {
271 m.l.Warn("Metric module is already running. Cannot start again.")
272 return
273 }
274
275 cbFunc := pubsub.CallBackFunc(m.PodCallBackFn)
276 m.pubsubPodSub = m.pubsub.Subscribe(common.PubSubPods, &cbFunc)
277
278 m.wg.Add(1)
279 go func() {
280 m.Lock()
281 m.isRunning = true
282 m.ctx = newCtx
283 m.Unlock()
284
285 evReader := m.enricher.ExportReader()
286 for {
287 ev := evReader.NextFollow(newCtx)
288 if ev == nil {
289 break
290 }
291
292 switch ev.Event.(type) {
293 case *flow.Flow:
294 m.RLock()
295 f := ev.Event.(*flow.Flow)
296 m.l.Debug("converted flow object", zap.Any("flow l4", f.IP))
297 for _, metricObj := range m.registry {
298 metricObj.ProcessFlow(f)
299 }
300 m.RUnlock()
301 case *flow.LostEvent:
302 ev := ev.Event.(*flow.LostEvent)
303 // the number of lost events == the size of the ring buffer initialized.
304 metrics.LostEventsCounter.WithLabelValues(utils.EnricherRing, string(metricModuleReq)).Add(float64(ev.NumEventsLost))
305 default:
306 m.l.Warn("Unknown event type", zap.Any("event", ev))
307 }
308 }
309
310 err := evReader.Close()
311 if err != nil {
312 m.l.Error("Error closing the event reader", zap.Error(err))
313 }
314 m.Lock()
315 m.isRunning = false
316 m.ctx = nil
317 m.Unlock()
318
319 m.wg.Done()
320 }()
321
322 m.wg.Add(1)
323 go func() {
324 ticker := time.NewTicker(interval)
325 defer ticker.Stop()
326 for {

Callers 1

ReconcileMethod · 0.95

Calls 12

applyDirtyPodsMethod · 0.95
CallBackFuncFuncType · 0.92
InfoMethod · 0.80
SubscribeMethod · 0.65
AddMethod · 0.65
ExportReaderMethod · 0.65
ProcessFlowMethod · 0.65
WithLabelValuesMethod · 0.65
CloseMethod · 0.65
StopMethod · 0.65
UnsubscribeMethod · 0.65
ErrorMethod · 0.45

Tested by

no test coverage detected