MCPcopy
hub / github.com/crowdsecurity/crowdsec / ServiceManager

Method ServiceManager

pkg/acquisition/modules/docker/run.go:846–884  ·  view source on GitHub ↗
(ctx context.Context, in chan *ContainerConfig, deleteChan chan *ContainerConfig, outChan chan pipeline.Event)

Source from the content-addressed store, hash-verified

844}
845
846func (d *Source) ServiceManager(ctx context.Context, in chan *ContainerConfig, deleteChan chan *ContainerConfig, outChan chan pipeline.Event) error {
847 d.logger.Info("Service Manager started")
848
849 for {
850 select {
851 case newService := <-in:
852 if _, ok := d.runningServiceState.Get(newService.ID); !ok {
853 newService.logger = d.logger.WithField("service_name", newService.Name)
854 newService.t.Go(func() error {
855 return d.TailService(ctx, newService, outChan, deleteChan)
856 })
857
858 d.runningServiceState.Set(newService.ID, newService)
859 }
860 case serviceToDelete := <-deleteChan:
861 if serviceConfig, ok := d.runningServiceState.Get(serviceToDelete.ID); ok {
862 d.logger.Infof("service acquisition stopped for service '%s'", serviceConfig.Name)
863 serviceConfig.t.Kill(nil)
864 d.runningServiceState.Delete(serviceToDelete.ID)
865 }
866 case <-d.t.Dying():
867 for _, service := range d.runningServiceState.GetAll() {
868 if service.t.Alive() {
869 d.logger.Infof("killing tail for service %s", service.Name)
870 service.t.Kill(nil)
871
872 if err := service.t.Wait(); err != nil {
873 d.logger.Infof("error while waiting for death of %s : %s", service.Name, err)
874 }
875 }
876 }
877
878 d.runningServiceState = nil
879 d.logger.Debugf("service manager cleanup done, return")
880
881 return nil
882 }
883 }
884}

Callers 1

StreamingAcquisitionMethod · 0.95

Calls 7

TailServiceMethod · 0.95
KillMethod · 0.80
GetAllMethod · 0.80
InfoMethod · 0.45
GetMethod · 0.45
SetMethod · 0.45
DeleteMethod · 0.45

Tested by

no test coverage detected