(ctx context.Context, in chan *ContainerConfig, deleteChan chan *ContainerConfig, outChan chan pipeline.Event)
| 804 | } |
| 805 | |
| 806 | func (d *Source) ContainerManager(ctx context.Context, in chan *ContainerConfig, deleteChan chan *ContainerConfig, outChan chan pipeline.Event) error { |
| 807 | d.logger.Info("Container Manager started") |
| 808 | |
| 809 | for { |
| 810 | select { |
| 811 | case newContainer := <-in: |
| 812 | if _, ok := d.runningContainerState.Get(newContainer.ID); !ok { |
| 813 | newContainer.logger = d.logger.WithField("container_name", newContainer.Name) |
| 814 | newContainer.t.Go(func() error { |
| 815 | return d.TailContainer(ctx, newContainer, outChan, deleteChan) |
| 816 | }) |
| 817 | |
| 818 | d.runningContainerState.Set(newContainer.ID, newContainer) |
| 819 | } |
| 820 | case containerToDelete := <-deleteChan: |
| 821 | if containerConfig, ok := d.runningContainerState.Get(containerToDelete.ID); ok { |
| 822 | log.Infof("container acquisition stopped for container '%s'", containerConfig.Name) |
| 823 | containerConfig.t.Kill(nil) |
| 824 | d.runningContainerState.Delete(containerToDelete.ID) |
| 825 | } |
| 826 | case <-d.t.Dying(): |
| 827 | for _, container := range d.runningContainerState.GetAll() { |
| 828 | if container.t.Alive() { |
| 829 | d.logger.Infof("killing tail for container %s", container.Name) |
| 830 | container.t.Kill(nil) |
| 831 | |
| 832 | if err := container.t.Wait(); err != nil { |
| 833 | d.logger.Infof("error while waiting for death of %s : %s", container.Name, err) |
| 834 | } |
| 835 | } |
| 836 | } |
| 837 | |
| 838 | d.runningContainerState = nil |
| 839 | d.logger.Debugf("routine cleanup done, return") |
| 840 | |
| 841 | return nil |
| 842 | } |
| 843 | } |
| 844 | } |
| 845 | |
| 846 | func (d *Source) ServiceManager(ctx context.Context, in chan *ContainerConfig, deleteChan chan *ContainerConfig, outChan chan pipeline.Event) error { |
| 847 | d.logger.Info("Service Manager started") |
no test coverage detected