MCPcopy
hub / github.com/apache/devlake / StreamProcess

Function StreamProcess

backend/core/utils/ipc.go:162–210  ·  view source on GitHub ↗

StreamProcess runs the cmd and returns its output on a line-by-line basis, on a channel. The converter functor will allow you to convert the incoming raw to your custom data type T. This is a nonblocking function.

(cmd *exec.Cmd, opts *StreamProcessOptions)

Source from the content-addressed store, hash-verified

160// StreamProcess runs the cmd and returns its output on a line-by-line basis, on a channel. The converter functor will allow you
161// to convert the incoming raw to your custom data type T. This is a nonblocking function.
162func StreamProcess(cmd *exec.Cmd, opts *StreamProcessOptions) (*ProcessStream, errors.Error) {
163 if opts == nil {
164 opts = &StreamProcessOptions{}
165 }
166 cmd.Env = append(cmd.Env, os.Environ()...)
167 pipes, err := getPipes(cmd, opts)
168 if err != nil {
169 return nil, err
170 }
171 if err = errors.Convert(cmd.Start()); err != nil {
172 return nil, err
173 }
174 receiveStream := make(chan *ProcessResponse, 32)
175 wg := &sync.WaitGroup{}
176 stdScanner := scanOutputPipe(pipes.stdout, wg, opts.OnStdout, func(result []byte) *ProcessResponse {
177 return &ProcessResponse{stdout: result}
178 }, receiveStream)
179 errScanner, remoteErrorMsg := scanErrorPipe(pipes.stderr, opts.OnStderr, receiveStream)
180 fdOutScanner := scanOutputPipe(pipes.fdOut, wg, opts.OnFdOut, func(result []byte) *ProcessResponse {
181 return &ProcessResponse{fdOut: result}
182 }, receiveStream)
183 wg.Add(2)
184 if pipes.fdOut != nil {
185 wg.Add(1)
186 }
187 go stdScanner()
188 go errScanner()
189 if pipes.fdOut != nil {
190 go fdOutScanner()
191 }
192 processStream := &ProcessStream{
193 process: cmd.Process,
194 receiveChannel: receiveStream,
195 }
196 go func() {
197 defer pipes.close()
198 if err = errors.Convert(cmd.Wait()); err != nil {
199 if !processStream.cancelled {
200 receiveStream <- &ProcessResponse{err: errors.Default.Wrap(err, fmt.Sprintf("remote error response:\n%s", remoteErrorMsg))}
201 }
202 }
203 wg.Done()
204 }()
205 go func() {
206 defer close(receiveStream)
207 wg.Wait()
208 }()
209 return processStream, nil
210}
211
212func getPipes(cmd *exec.Cmd, opts *StreamProcessOptions) (*processPipes, errors.Error) {
213 stdout, err := cmd.StdoutPipe()

Callers 1

RunProcessFunction · 0.85

Calls 8

getPipesFunction · 0.85
scanOutputPipeFunction · 0.85
scanErrorPipeFunction · 0.85
closeMethod · 0.80
WaitMethod · 0.80
WrapMethod · 0.80
ConvertMethod · 0.45
AddMethod · 0.45

Tested by

no test coverage detected