(ctx context.Context, service *ContainerConfig, outChan chan pipeline.Event, bo backoff.BackOff)
| 734 | } |
| 735 | |
| 736 | func (d *Source) tailServiceAttempt(ctx context.Context, service *ContainerConfig, outChan chan pipeline.Event, bo backoff.BackOff) error { |
| 737 | // For services, we need to get the service logs using the service logs API |
| 738 | // Docker service logs aggregates logs from all running tasks of the service |
| 739 | logOptions := client.ServiceLogsOptions{ |
| 740 | ShowStdout: service.logOptions.ShowStdout, |
| 741 | ShowStderr: service.logOptions.ShowStderr, |
| 742 | Since: service.logOptions.Since, |
| 743 | Until: service.logOptions.Until, |
| 744 | Timestamps: service.logOptions.Timestamps, |
| 745 | Follow: service.logOptions.Follow, |
| 746 | Tail: service.logOptions.Tail, |
| 747 | Details: service.logOptions.Details, |
| 748 | } |
| 749 | dockerReader, err := d.Client.ServiceLogs(ctx, service.ID, logOptions) |
| 750 | if err != nil { |
| 751 | return fmt.Errorf("unable to read logs from service %s: %w", service.Name, err) |
| 752 | } |
| 753 | |
| 754 | // Log connection (both initial and reconnections) |
| 755 | service.logger.Info("connected to service logs") |
| 756 | |
| 757 | bo.Reset() |
| 758 | |
| 759 | // Service logs don't use TTY, so we always use the dlog reader |
| 760 | reader := dlog.NewReader(dockerReader) |
| 761 | scanner := bufio.NewScanner(reader) |
| 762 | |
| 763 | readerChan := make(chan string) |
| 764 | readerTomb := &tomb.Tomb{} |
| 765 | readerTomb.Go(func() error { |
| 766 | return ReadTailScanner(scanner, readerChan, readerTomb) |
| 767 | }) |
| 768 | |
| 769 | for { |
| 770 | select { |
| 771 | case <-service.t.Dying(): |
| 772 | readerTomb.Kill(nil) |
| 773 | return nil |
| 774 | case line := <-readerChan: |
| 775 | if line == "" { |
| 776 | continue |
| 777 | } |
| 778 | |
| 779 | l := pipeline.Line{} |
| 780 | l.Raw = line |
| 781 | l.Labels = service.Labels |
| 782 | l.Time = time.Now().UTC() |
| 783 | l.Src = service.Name |
| 784 | l.Process = true |
| 785 | l.Module = d.GetName() |
| 786 | evt := pipeline.MakeEvent(d.Config.UseTimeMachine, pipeline.LOG, true) |
| 787 | evt.Line = l |
| 788 | |
| 789 | if d.metricsLevel != metrics.AcquisitionMetricsLevelNone { |
| 790 | metrics.DockerDatasourceLinesRead.With(prometheus.Labels{"source": service.Name, "acquis_type": l.Labels["type"], "datasource_type": ModuleName}).Inc() |
| 791 | } |
| 792 | |
| 793 | outChan <- evt |
no test coverage detected