OneShotAcquisition reads a set of file and returns when done
(ctx context.Context, out chan pipeline.Event, t *tomb.Tomb)
| 41 | |
| 42 | // OneShotAcquisition reads a set of file and returns when done |
| 43 | func (d *Source) OneShotAcquisition(ctx context.Context, out chan pipeline.Event, t *tomb.Tomb) error { |
| 44 | d.logger.Debug("In oneshot") |
| 45 | |
| 46 | runningContainers, err := d.Client.ContainerList(ctx, client.ContainerListOptions{}) |
| 47 | if err != nil { |
| 48 | return err |
| 49 | } |
| 50 | |
| 51 | foundOne := false |
| 52 | |
| 53 | for _, container := range runningContainers.Items { |
| 54 | if _, ok := d.runningContainerState.Get(container.ID); ok { |
| 55 | d.logger.Debugf("container with id %s is already being read from", container.ID) |
| 56 | continue |
| 57 | } |
| 58 | |
| 59 | if containerConfig := d.EvalContainer(ctx, container); containerConfig != nil { |
| 60 | d.logger.Infof("reading logs from container %s", containerConfig.Name) |
| 61 | d.logger.Debugf("logs options: %+v", *d.containerLogsOptions) |
| 62 | |
| 63 | dockerReader, err := d.Client.ContainerLogs(ctx, containerConfig.ID, *d.containerLogsOptions) |
| 64 | if err != nil { |
| 65 | d.logger.Errorf("unable to read logs from container: %+v", err) |
| 66 | return err |
| 67 | } |
| 68 | |
| 69 | // we use this library to normalize docker API logs (cf. https://ahmet.im/blog/docker-logs-api-binary-format-explained/) |
| 70 | foundOne = true |
| 71 | |
| 72 | var scanner *bufio.Scanner |
| 73 | |
| 74 | if containerConfig.Tty { |
| 75 | scanner = bufio.NewScanner(dockerReader) |
| 76 | } else { |
| 77 | reader := dlog.NewReader(dockerReader) |
| 78 | scanner = bufio.NewScanner(reader) |
| 79 | } |
| 80 | |
| 81 | for scanner.Scan() { |
| 82 | select { |
| 83 | case <-t.Dying(): |
| 84 | d.logger.Infof("Shutting down reader for container %s", containerConfig.Name) |
| 85 | default: |
| 86 | line := scanner.Text() |
| 87 | if line == "" { |
| 88 | continue |
| 89 | } |
| 90 | |
| 91 | l := pipeline.Line{} |
| 92 | l.Raw = line |
| 93 | l.Labels = d.Config.Labels |
| 94 | l.Time = time.Now().UTC() |
| 95 | l.Src = containerConfig.Name |
| 96 | l.Process = true |
| 97 | l.Module = d.GetName() |
| 98 | |
| 99 | if d.metricsLevel != metrics.AcquisitionMetricsLevelNone { |
| 100 | metrics.DockerDatasourceLinesRead.With(prometheus.Labels{"source": containerConfig.Name, "acquis_type": l.Labels["type"], "datasource_type": ModuleName}).Inc() |