MCPcopy
hub / github.com/kubernetes/test-infra / executeParallelCommand

Method executeParallelCommand

kubetest/process/process.go:230–287  ·  view source on GitHub ↗

executeParallelCommand executes a given command and send output and error via channel

(cmd *exec.Cmd, resChan chan cmdExecResult, termChan, intChan chan struct{})

Source from the content-addressed store, hash-verified

228
229// executeParallelCommand executes a given command and send output and error via channel
230func (c *Control) executeParallelCommand(cmd *exec.Cmd, resChan chan cmdExecResult, termChan, intChan chan struct{}) {
231 stepName := strings.Join(cmd.Args, " ")
232 stdout := bytes.Buffer{}
233 cmd.Stdout = &stdout
234 cmd.Stderr = &stdout
235
236 start := time.Now()
237 log.Printf("Running: %v in parallel", stepName)
238
239 if c.isTerminated() {
240 resChan <- cmdExecResult{stepName: stepName, output: stdout.String(), execTime: time.Since(start), err: fmt.Errorf("skipped %s (kubetest is terminated)", stepName)}
241 return
242 }
243
244 cmd.SysProcAttr = &syscall.SysProcAttr{Setpgid: true}
245 if err := cmd.Start(); err != nil {
246 resChan <- cmdExecResult{stepName: stepName, output: stdout.String(), execTime: time.Since(start), err: fmt.Errorf("error starting %v: %w", stepName, err)}
247 return
248 }
249
250 finished := make(chan error)
251 go func() {
252 finished <- cmd.Wait()
253 }()
254
255 for {
256 select {
257 case err := <-finished:
258 if err != nil {
259 var suffix string
260 if c.isTerminated() {
261 suffix = " (terminated)"
262 } else if c.isInterrupted() {
263 suffix = " (interrupted)"
264 }
265 err = fmt.Errorf("error during %s%s: %w", stepName, suffix, err)
266 }
267 resChan <- cmdExecResult{stepName: stepName, output: stdout.String(), execTime: time.Since(start), err: err}
268 return
269
270 case <-termChan:
271 pgid := getGroupPid(cmd.Process.Pid)
272 syscall.Kill(-pgid, syscall.SIGKILL)
273 if err := cmd.Process.Kill(); err != nil {
274 log.Printf("Failed to terminate %s (terminated 15m after interrupt): %v", strings.Join(cmd.Args, " "), err)
275 }
276
277 case <-intChan:
278 log.Printf("Abort after %s timeout during %s. Will terminate in another 15m", c.Timeout, strings.Join(cmd.Args, " "))
279 pgid := getGroupPid(cmd.Process.Pid)
280 if err := syscall.Kill(-pgid, syscall.SIGABRT); err != nil {
281 log.Printf("Failed to abort %s. Will terminate immediately: %v", strings.Join(cmd.Args, " "), err)
282 syscall.Kill(-pgid, syscall.SIGTERM)
283 cmd.Process.Kill()
284 }
285 }
286 }
287}

Callers 1

FinishRunningParallelMethod · 0.95

Calls 5

isTerminatedMethod · 0.95
isInterruptedMethod · 0.95
makeFunction · 0.85
getGroupPidFunction · 0.85
StringMethod · 0.45

Tested by

no test coverage detected