MCPcopy
hub / github.com/crowdsecurity/crowdsec / tailContainerAttempt

Method tailContainerAttempt

pkg/acquisition/modules/docker/run.go:623–686  ·  view source on GitHub ↗
(ctx context.Context, container *ContainerConfig, outChan chan pipeline.Event, bo backoff.BackOff)

Source from the content-addressed store, hash-verified

621}
622
623func (d *Source) tailContainerAttempt(ctx context.Context, container *ContainerConfig, outChan chan pipeline.Event, bo backoff.BackOff) error {
624 dockerReader, err := d.Client.ContainerLogs(ctx, container.ID, *container.logOptions)
625 if err != nil {
626 return fmt.Errorf("unable to read logs from container %s: %w", container.Name, err)
627 }
628
629 // Log connection (both initial and reconnections)
630 container.logger.Info("connected to container logs")
631
632 // reset backoff so for the next disconnect, the interval doesn't start from 30sec
633 bo.Reset()
634
635 var scanner *bufio.Scanner
636 // we use this library to normalize docker API logs (cf. https://ahmet.im/blog/docker-logs-api-binary-format-explained/)
637 if container.Tty {
638 scanner = bufio.NewScanner(dockerReader)
639 } else {
640 reader := dlog.NewReader(dockerReader)
641 scanner = bufio.NewScanner(reader)
642 }
643
644 readerChan := make(chan string)
645 readerTomb := &tomb.Tomb{}
646 readerTomb.Go(func() error {
647 return ReadTailScanner(scanner, readerChan, readerTomb)
648 })
649
650 for {
651 select {
652 case <-container.t.Dying():
653 readerTomb.Kill(nil)
654 return nil
655 case line := <-readerChan:
656 if line == "" {
657 continue
658 }
659
660 l := pipeline.Line{}
661 l.Raw = line
662 l.Labels = container.Labels
663 l.Time = time.Now().UTC()
664 l.Src = container.Name
665 l.Process = true
666 l.Module = d.GetName()
667 evt := pipeline.MakeEvent(d.Config.UseTimeMachine, pipeline.LOG, true)
668 evt.Line = l
669
670 if d.metricsLevel != metrics.AcquisitionMetricsLevelNone {
671 metrics.DockerDatasourceLinesRead.With(prometheus.Labels{"source": container.Name, "datasource_type": ModuleName, "acquis_type": evt.Line.Labels["type"]}).Inc()
672 }
673
674 outChan <- evt
675
676 d.logger.Debugf("Sent line to parsing: %+v", evt.Line.Raw)
677 case <-readerTomb.Dying():
678 // This case is to handle temporarily losing the connection to the docker socket
679 // The only known case currently is when using docker-socket-proxy (and maybe a docker daemon restart)
680 container.logger.Debugf("readerTomb dying, connection lost")

Callers 1

TailContainerMethod · 0.95

Calls 10

GetNameMethod · 0.95
MakeEventFunction · 0.92
ReadTailScannerFunction · 0.85
ContainerLogsMethod · 0.80
NewReaderMethod · 0.80
KillMethod · 0.80
IncMethod · 0.80
InfoMethod · 0.45
ResetMethod · 0.45
WithMethod · 0.45

Tested by

no test coverage detected