| 17 | ) |
| 18 | |
| 19 | func (s *Source) ReadMessage(ctx context.Context, out chan pipeline.Event) error { |
| 20 | if s.Config.GroupID == "" { |
| 21 | err := s.Reader.SetOffset(kafka.LastOffset) |
| 22 | if err != nil { |
| 23 | return fmt.Errorf("while setting offset for reader on topic '%s': %w", s.Config.Topic, err) |
| 24 | } |
| 25 | } |
| 26 | |
| 27 | for { |
| 28 | s.logger.Tracef("reading message from topic '%s'", s.Config.Topic) |
| 29 | |
| 30 | m, err := s.Reader.ReadMessage(ctx) |
| 31 | if err != nil { |
| 32 | if errors.Is(err, io.EOF) { |
| 33 | return nil |
| 34 | } |
| 35 | |
| 36 | s.logger.Errorln(fmt.Errorf("while reading %s message: %w", s.GetName(), err)) |
| 37 | |
| 38 | continue |
| 39 | } |
| 40 | |
| 41 | s.logger.Tracef("got message: %s", string(m.Value)) |
| 42 | l := pipeline.Line{ |
| 43 | Raw: string(m.Value), |
| 44 | Labels: s.Config.Labels, |
| 45 | Time: m.Time.UTC(), |
| 46 | Src: s.Config.Topic, |
| 47 | Process: true, |
| 48 | Module: s.GetName(), |
| 49 | } |
| 50 | s.logger.Tracef("line with message read from topic '%s': %+v", s.Config.Topic, l) |
| 51 | |
| 52 | if s.metricsLevel != metrics.AcquisitionMetricsLevelNone { |
| 53 | metrics.KafkaDataSourceLinesRead.With(prometheus.Labels{"topic": s.Config.Topic, "datasource_type": ModuleName, "acquis_type": l.Labels["type"]}).Inc() |
| 54 | } |
| 55 | |
| 56 | evt := pipeline.MakeEvent(s.Config.UseTimeMachine, pipeline.LOG, true) |
| 57 | evt.Line = l |
| 58 | |
| 59 | out <- evt |
| 60 | } |
| 61 | } |
| 62 | |
| 63 | func (s *Source) RunReader(ctx context.Context, out chan pipeline.Event, t *tomb.Tomb) error { |
| 64 | s.logger.Debugf("starting %s datasource reader goroutine with configuration %+v", s.GetName(), s.Config) |