| 488 | } |
| 489 | |
| 490 | func (d *Source) StreamingAcquisition(ctx context.Context, out chan pipeline.Event, t *tomb.Tomb) error { |
| 491 | d.t = t |
| 492 | containerChan := make(chan *ContainerConfig) |
| 493 | containerDeleteChan := make(chan *ContainerConfig) |
| 494 | serviceChan := make(chan *ContainerConfig) |
| 495 | serviceDeleteChan := make(chan *ContainerConfig) |
| 496 | |
| 497 | d.logger.Infof("Starting docker acquisition") |
| 498 | |
| 499 | t.Go(func() error { |
| 500 | return d.ContainerManager(ctx, containerChan, containerDeleteChan, out) |
| 501 | }) |
| 502 | |
| 503 | if d.isSwarmManager { |
| 504 | t.Go(func() error { |
| 505 | return d.ServiceManager(ctx, serviceChan, serviceDeleteChan, out) |
| 506 | }) |
| 507 | } |
| 508 | |
| 509 | return d.Watch(ctx, containerChan, containerDeleteChan, serviceChan, serviceDeleteChan) |
| 510 | } |
| 511 | |
| 512 | func ReadTailScanner(scanner *bufio.Scanner, out chan string, t *tomb.Tomb) error { |
| 513 | for scanner.Scan() { |