MCPcopy
hub / github.com/crowdsecurity/crowdsec / tailServiceAttempt

Method tailServiceAttempt

pkg/acquisition/modules/docker/run.go:736–804  ·  view source on GitHub ↗
(ctx context.Context, service *ContainerConfig, outChan chan pipeline.Event, bo backoff.BackOff)

Source from the content-addressed store, hash-verified

734}
735
736func (d *Source) tailServiceAttempt(ctx context.Context, service *ContainerConfig, outChan chan pipeline.Event, bo backoff.BackOff) error {
737 // For services, we need to get the service logs using the service logs API
738 // Docker service logs aggregates logs from all running tasks of the service
739 logOptions := client.ServiceLogsOptions{
740 ShowStdout: service.logOptions.ShowStdout,
741 ShowStderr: service.logOptions.ShowStderr,
742 Since: service.logOptions.Since,
743 Until: service.logOptions.Until,
744 Timestamps: service.logOptions.Timestamps,
745 Follow: service.logOptions.Follow,
746 Tail: service.logOptions.Tail,
747 Details: service.logOptions.Details,
748 }
749 dockerReader, err := d.Client.ServiceLogs(ctx, service.ID, logOptions)
750 if err != nil {
751 return fmt.Errorf("unable to read logs from service %s: %w", service.Name, err)
752 }
753
754 // Log connection (both initial and reconnections)
755 service.logger.Info("connected to service logs")
756
757 bo.Reset()
758
759 // Service logs don't use TTY, so we always use the dlog reader
760 reader := dlog.NewReader(dockerReader)
761 scanner := bufio.NewScanner(reader)
762
763 readerChan := make(chan string)
764 readerTomb := &tomb.Tomb{}
765 readerTomb.Go(func() error {
766 return ReadTailScanner(scanner, readerChan, readerTomb)
767 })
768
769 for {
770 select {
771 case <-service.t.Dying():
772 readerTomb.Kill(nil)
773 return nil
774 case line := <-readerChan:
775 if line == "" {
776 continue
777 }
778
779 l := pipeline.Line{}
780 l.Raw = line
781 l.Labels = service.Labels
782 l.Time = time.Now().UTC()
783 l.Src = service.Name
784 l.Process = true
785 l.Module = d.GetName()
786 evt := pipeline.MakeEvent(d.Config.UseTimeMachine, pipeline.LOG, true)
787 evt.Line = l
788
789 if d.metricsLevel != metrics.AcquisitionMetricsLevelNone {
790 metrics.DockerDatasourceLinesRead.With(prometheus.Labels{"source": service.Name, "acquis_type": l.Labels["type"], "datasource_type": ModuleName}).Inc()
791 }
792
793 outChan <- evt

Callers 1

TailServiceMethod · 0.95

Calls 10

GetNameMethod · 0.95
MakeEventFunction · 0.92
ReadTailScannerFunction · 0.85
ServiceLogsMethod · 0.80
NewReaderMethod · 0.80
KillMethod · 0.80
IncMethod · 0.80
InfoMethod · 0.45
ResetMethod · 0.45
WithMethod · 0.45

Tested by

no test coverage detected