| 292 | } |
| 293 | |
| 294 | func (s *Source) tailFile(out chan pipeline.Event, t *tomb.Tomb, tail *tail.Tail) error { |
| 295 | logger := s.logger.WithField("tail", tail.Filename) |
| 296 | logger.Debug("-> start tailing") |
| 297 | |
| 298 | for { |
| 299 | select { |
| 300 | case <-t.Dying(): |
| 301 | logger.Info("File datasource stopping") |
| 302 | |
| 303 | if err := tail.Stop(); err != nil { |
| 304 | s.logger.Errorf("error in stop : %s", err) |
| 305 | return err |
| 306 | } |
| 307 | |
| 308 | return nil |
| 309 | case <-tail.Dying(): // our tailer is dying |
| 310 | errMsg := "file reader died" |
| 311 | |
| 312 | err := tail.Err() |
| 313 | if err != nil { |
| 314 | errMsg = fmt.Sprintf(errMsg+" : %s", err) |
| 315 | } |
| 316 | |
| 317 | logger.Warning(errMsg) |
| 318 | |
| 319 | // Just remove the dead tailer from our map and return |
| 320 | // monitorNewFiles will pick up the file again if it's recreated |
| 321 | s.tailMapMutex.Lock() |
| 322 | delete(s.tails, tail.Filename) |
| 323 | s.tailMapMutex.Unlock() |
| 324 | |
| 325 | return nil |
| 326 | case line := <-tail.Lines: |
| 327 | if line == nil { |
| 328 | logger.Warning("tail is empty") |
| 329 | continue |
| 330 | } |
| 331 | |
| 332 | if line.Err != nil { |
| 333 | logger.Warningf("fetch error : %v", line.Err) |
| 334 | return line.Err |
| 335 | } |
| 336 | |
| 337 | if line.Text == "" { // skip empty lines |
| 338 | continue |
| 339 | } |
| 340 | |
| 341 | if s.metricsLevel != metrics.AcquisitionMetricsLevelNone { |
| 342 | metrics.FileDatasourceLinesRead.With(prometheus.Labels{"source": tail.Filename, "datasource_type": ModuleName, "acquis_type": s.config.Labels["type"]}).Inc() |
| 343 | } |
| 344 | |
| 345 | src := tail.Filename |
| 346 | if s.metricsLevel == metrics.AcquisitionMetricsLevelAggregated { |
| 347 | src = filepath.Base(tail.Filename) |
| 348 | } |
| 349 | |
| 350 | l := pipeline.Line{ |
| 351 | Raw: trimLine(line.Text), |