(ctx context.Context, in chan *ContainerConfig, deleteChan chan *ContainerConfig, outChan chan pipeline.Event)
| 844 | } |
| 845 | |
| 846 | func (d *Source) ServiceManager(ctx context.Context, in chan *ContainerConfig, deleteChan chan *ContainerConfig, outChan chan pipeline.Event) error { |
| 847 | d.logger.Info("Service Manager started") |
| 848 | |
| 849 | for { |
| 850 | select { |
| 851 | case newService := <-in: |
| 852 | if _, ok := d.runningServiceState.Get(newService.ID); !ok { |
| 853 | newService.logger = d.logger.WithField("service_name", newService.Name) |
| 854 | newService.t.Go(func() error { |
| 855 | return d.TailService(ctx, newService, outChan, deleteChan) |
| 856 | }) |
| 857 | |
| 858 | d.runningServiceState.Set(newService.ID, newService) |
| 859 | } |
| 860 | case serviceToDelete := <-deleteChan: |
| 861 | if serviceConfig, ok := d.runningServiceState.Get(serviceToDelete.ID); ok { |
| 862 | d.logger.Infof("service acquisition stopped for service '%s'", serviceConfig.Name) |
| 863 | serviceConfig.t.Kill(nil) |
| 864 | d.runningServiceState.Delete(serviceToDelete.ID) |
| 865 | } |
| 866 | case <-d.t.Dying(): |
| 867 | for _, service := range d.runningServiceState.GetAll() { |
| 868 | if service.t.Alive() { |
| 869 | d.logger.Infof("killing tail for service %s", service.Name) |
| 870 | service.t.Kill(nil) |
| 871 | |
| 872 | if err := service.t.Wait(); err != nil { |
| 873 | d.logger.Infof("error while waiting for death of %s : %s", service.Name, err) |
| 874 | } |
| 875 | } |
| 876 | } |
| 877 | |
| 878 | d.runningServiceState = nil |
| 879 | d.logger.Debugf("service manager cleanup done, return") |
| 880 | |
| 881 | return nil |
| 882 | } |
| 883 | } |
| 884 | } |
no test coverage detected