(ctx context.Context, container *ContainerConfig, outChan chan pipeline.Event, bo backoff.BackOff)
| 621 | } |
| 622 | |
| 623 | func (d *Source) tailContainerAttempt(ctx context.Context, container *ContainerConfig, outChan chan pipeline.Event, bo backoff.BackOff) error { |
| 624 | dockerReader, err := d.Client.ContainerLogs(ctx, container.ID, *container.logOptions) |
| 625 | if err != nil { |
| 626 | return fmt.Errorf("unable to read logs from container %s: %w", container.Name, err) |
| 627 | } |
| 628 | |
| 629 | // Log connection (both initial and reconnections) |
| 630 | container.logger.Info("connected to container logs") |
| 631 | |
| 632 | // reset backoff so for the next disconnect, the interval doesn't start from 30sec |
| 633 | bo.Reset() |
| 634 | |
| 635 | var scanner *bufio.Scanner |
| 636 | // we use this library to normalize docker API logs (cf. https://ahmet.im/blog/docker-logs-api-binary-format-explained/) |
| 637 | if container.Tty { |
| 638 | scanner = bufio.NewScanner(dockerReader) |
| 639 | } else { |
| 640 | reader := dlog.NewReader(dockerReader) |
| 641 | scanner = bufio.NewScanner(reader) |
| 642 | } |
| 643 | |
| 644 | readerChan := make(chan string) |
| 645 | readerTomb := &tomb.Tomb{} |
| 646 | readerTomb.Go(func() error { |
| 647 | return ReadTailScanner(scanner, readerChan, readerTomb) |
| 648 | }) |
| 649 | |
| 650 | for { |
| 651 | select { |
| 652 | case <-container.t.Dying(): |
| 653 | readerTomb.Kill(nil) |
| 654 | return nil |
| 655 | case line := <-readerChan: |
| 656 | if line == "" { |
| 657 | continue |
| 658 | } |
| 659 | |
| 660 | l := pipeline.Line{} |
| 661 | l.Raw = line |
| 662 | l.Labels = container.Labels |
| 663 | l.Time = time.Now().UTC() |
| 664 | l.Src = container.Name |
| 665 | l.Process = true |
| 666 | l.Module = d.GetName() |
| 667 | evt := pipeline.MakeEvent(d.Config.UseTimeMachine, pipeline.LOG, true) |
| 668 | evt.Line = l |
| 669 | |
| 670 | if d.metricsLevel != metrics.AcquisitionMetricsLevelNone { |
| 671 | metrics.DockerDatasourceLinesRead.With(prometheus.Labels{"source": container.Name, "datasource_type": ModuleName, "acquis_type": evt.Line.Labels["type"]}).Inc() |
| 672 | } |
| 673 | |
| 674 | outChan <- evt |
| 675 | |
| 676 | d.logger.Debugf("Sent line to parsing: %+v", evt.Line.Raw) |
| 677 | case <-readerTomb.Dying(): |
| 678 | // This case is to handle temporarily losing the connection to the docker socket |
| 679 | // The only known case currently is when using docker-socket-proxy (and maybe a docker daemon restart) |
| 680 | container.logger.Debugf("readerTomb dying, connection lost") |
no test coverage detected