(newCtx context.Context)
| 267 | } |
| 268 | |
| 269 | func (m *Module) run(newCtx context.Context) { |
| 270 | if m.isRunning { |
| 271 | m.l.Warn("Metric module is already running. Cannot start again.") |
| 272 | return |
| 273 | } |
| 274 | |
| 275 | cbFunc := pubsub.CallBackFunc(m.PodCallBackFn) |
| 276 | m.pubsubPodSub = m.pubsub.Subscribe(common.PubSubPods, &cbFunc) |
| 277 | |
| 278 | m.wg.Add(1) |
| 279 | go func() { |
| 280 | m.Lock() |
| 281 | m.isRunning = true |
| 282 | m.ctx = newCtx |
| 283 | m.Unlock() |
| 284 | |
| 285 | evReader := m.enricher.ExportReader() |
| 286 | for { |
| 287 | ev := evReader.NextFollow(newCtx) |
| 288 | if ev == nil { |
| 289 | break |
| 290 | } |
| 291 | |
| 292 | switch ev.Event.(type) { |
| 293 | case *flow.Flow: |
| 294 | m.RLock() |
| 295 | f := ev.Event.(*flow.Flow) |
| 296 | m.l.Debug("converted flow object", zap.Any("flow l4", f.IP)) |
| 297 | for _, metricObj := range m.registry { |
| 298 | metricObj.ProcessFlow(f) |
| 299 | } |
| 300 | m.RUnlock() |
| 301 | case *flow.LostEvent: |
| 302 | ev := ev.Event.(*flow.LostEvent) |
| 303 | // the number of lost events == the size of the ring buffer initialized. |
| 304 | metrics.LostEventsCounter.WithLabelValues(utils.EnricherRing, string(metricModuleReq)).Add(float64(ev.NumEventsLost)) |
| 305 | default: |
| 306 | m.l.Warn("Unknown event type", zap.Any("event", ev)) |
| 307 | } |
| 308 | } |
| 309 | |
| 310 | err := evReader.Close() |
| 311 | if err != nil { |
| 312 | m.l.Error("Error closing the event reader", zap.Error(err)) |
| 313 | } |
| 314 | m.Lock() |
| 315 | m.isRunning = false |
| 316 | m.ctx = nil |
| 317 | m.Unlock() |
| 318 | |
| 319 | m.wg.Done() |
| 320 | }() |
| 321 | |
| 322 | m.wg.Add(1) |
| 323 | go func() { |
| 324 | ticker := time.NewTicker(interval) |
| 325 | defer ticker.Stop() |
| 326 | for { |
no test coverage detected