MCPcopy
hub / github.com/crowdsecurity/crowdsec / Watch

Method Watch

pkg/acquisition/modules/docker/run.go:420–488  ·  view source on GitHub ↗
(ctx context.Context, containerChan chan *ContainerConfig, containerDeleteChan chan *ContainerConfig, serviceChan chan *ContainerConfig, serviceDeleteChan chan *ContainerConfig)

Source from the content-addressed store, hash-verified

418}
419
420func (d *Source) Watch(ctx context.Context, containerChan chan *ContainerConfig, containerDeleteChan chan *ContainerConfig, serviceChan chan *ContainerConfig, serviceDeleteChan chan *ContainerConfig) error {
421 err := d.checkContainers(ctx, containerChan, containerDeleteChan)
422 if err != nil {
423 return err
424 }
425
426 if d.isSwarmManager {
427 err = d.checkServices(ctx, serviceChan, serviceDeleteChan)
428 if err != nil {
429 return err
430 }
431 }
432
433 sub, err := d.subscribeEvents(ctx)
434 if err != nil {
435 return err
436 }
437
438 for {
439 select {
440 case <-d.t.Dying():
441 d.logger.Infof("stopping container watcher")
442 return nil
443
444 case event := <-sub.events:
445 d.logger.Tracef("Received event: %+v", event)
446
447 if event.Type == dockerTypesEvents.ServiceEventType && (event.Action == dockerTypesEvents.ActionCreate || event.Action == dockerTypesEvents.ActionRemove) {
448 if err := d.checkServices(ctx, serviceChan, serviceDeleteChan); err != nil {
449 d.logger.Warnf("Failed to check services: %v", err)
450 }
451 }
452
453 if event.Type == dockerTypesEvents.ContainerEventType && (event.Action == dockerTypesEvents.ActionStart || event.Action == dockerTypesEvents.ActionDie) {
454 if err := d.checkContainers(ctx, containerChan, containerDeleteChan); err != nil {
455 d.logger.Warnf("Failed to check containers: %v", err)
456 }
457 }
458 case err := <-sub.errs:
459 if err == nil {
460 continue
461 }
462
463 d.logger.Errorf("Docker events error: %v", err)
464
465 // try to reconnect, replacing our channels on success. They are never nil if err is nil.
466 newSub, recErr := d.subscribeEvents(ctx)
467 if recErr != nil {
468 return recErr
469 }
470
471 sub = newSub
472
473 d.logger.Info("Successfully reconnected to Docker events")
474
475 // We check containers after a reconnection because the docker daemon might have restarted
476 // and the container tombs may have self deleted
477 if err := d.checkContainers(ctx, containerChan, containerDeleteChan); err != nil {

Callers 1

StreamingAcquisitionMethod · 0.95

Calls 5

checkContainersMethod · 0.95
checkServicesMethod · 0.95
subscribeEventsMethod · 0.95
TracefMethod · 0.80
InfoMethod · 0.45

Tested by

no test coverage detected