run each rule in parallel and wait for them to complete
()
| 90 | |
| 91 | // run each rule in parallel and wait for them to complete |
| 92 | func (p *Plugin) runRules() { |
| 93 | klog.V(3).Info("Start to run custom plugins") |
| 94 | |
| 95 | for _, rule := range p.config.Rules { |
| 96 | // syncChan limits concurrent goroutines to configured PluginGlobalConfig.Concurrency value |
| 97 | p.syncChan <- struct{}{} |
| 98 | p.Add(1) |
| 99 | go func(rule *cpmtypes.CustomRule) { |
| 100 | defer p.Done() |
| 101 | defer func() { |
| 102 | <-p.syncChan |
| 103 | }() |
| 104 | |
| 105 | start := time.Now() |
| 106 | exitStatus, message := p.run(*rule) |
| 107 | level := klog.Level(3) |
| 108 | if exitStatus != 0 { |
| 109 | level = klog.Level(2) |
| 110 | } |
| 111 | |
| 112 | klog.V(level).Infof("Rule: %+v. Start time: %v. End time: %v. Duration: %v", rule, start, time.Now(), time.Since(start)) |
| 113 | |
| 114 | result := cpmtypes.Result{ |
| 115 | Rule: rule, |
| 116 | ExitStatus: exitStatus, |
| 117 | Message: message, |
| 118 | } |
| 119 | |
| 120 | // pipes result into resultChan which customPluginMonitor instance generates status from |
| 121 | p.resultChan <- result |
| 122 | |
| 123 | // Let the result be logged at a higher verbosity level. If there is a change in status it is logged later. |
| 124 | klog.V(level).Infof("Add check result %+v for rule %+v", result, rule) |
| 125 | }(rule) |
| 126 | } |
| 127 | |
| 128 | p.Wait() |
| 129 | klog.V(3).Info("Finish running custom plugins") |
| 130 | } |
| 131 | |
| 132 | // readFromReader reads the maxBytes from the reader and drains the rest. |
| 133 | func readFromReader(reader io.ReadCloser, maxBytes int64) ([]byte, error) { |