MCPcopy
hub / github.com/crowdsecurity/crowdsec / ReadMessage

Method ReadMessage

pkg/acquisition/modules/kafka/run.go:19–61  ·  view source on GitHub ↗
(ctx context.Context, out chan pipeline.Event)

Source from the content-addressed store, hash-verified

17)
18
19func (s *Source) ReadMessage(ctx context.Context, out chan pipeline.Event) error {
20 if s.Config.GroupID == "" {
21 err := s.Reader.SetOffset(kafka.LastOffset)
22 if err != nil {
23 return fmt.Errorf("while setting offset for reader on topic '%s': %w", s.Config.Topic, err)
24 }
25 }
26
27 for {
28 s.logger.Tracef("reading message from topic '%s'", s.Config.Topic)
29
30 m, err := s.Reader.ReadMessage(ctx)
31 if err != nil {
32 if errors.Is(err, io.EOF) {
33 return nil
34 }
35
36 s.logger.Errorln(fmt.Errorf("while reading %s message: %w", s.GetName(), err))
37
38 continue
39 }
40
41 s.logger.Tracef("got message: %s", string(m.Value))
42 l := pipeline.Line{
43 Raw: string(m.Value),
44 Labels: s.Config.Labels,
45 Time: m.Time.UTC(),
46 Src: s.Config.Topic,
47 Process: true,
48 Module: s.GetName(),
49 }
50 s.logger.Tracef("line with message read from topic '%s': %+v", s.Config.Topic, l)
51
52 if s.metricsLevel != metrics.AcquisitionMetricsLevelNone {
53 metrics.KafkaDataSourceLinesRead.With(prometheus.Labels{"topic": s.Config.Topic, "datasource_type": ModuleName, "acquis_type": l.Labels["type"]}).Inc()
54 }
55
56 evt := pipeline.MakeEvent(s.Config.UseTimeMachine, pipeline.LOG, true)
57 evt.Line = l
58
59 out <- evt
60 }
61}
62
63func (s *Source) RunReader(ctx context.Context, out chan pipeline.Event, t *tomb.Tomb) error {
64 s.logger.Debugf("starting %s datasource reader goroutine with configuration %+v", s.GetName(), s.Config)

Callers 1

RunReaderMethod · 0.95

Calls 5

GetNameMethod · 0.95
MakeEventFunction · 0.92
TracefMethod · 0.80
IncMethod · 0.80
WithMethod · 0.45

Tested by

no test coverage detected