StreamProcess runs the cmd and returns its output on a line-by-line basis, on a channel. The converter functor will allow you to convert the incoming raw to your custom data type T. This is a nonblocking function.
(cmd *exec.Cmd, opts *StreamProcessOptions)
| 160 | // StreamProcess runs the cmd and returns its output on a line-by-line basis, on a channel. The converter functor will allow you |
| 161 | // to convert the incoming raw to your custom data type T. This is a nonblocking function. |
| 162 | func StreamProcess(cmd *exec.Cmd, opts *StreamProcessOptions) (*ProcessStream, errors.Error) { |
| 163 | if opts == nil { |
| 164 | opts = &StreamProcessOptions{} |
| 165 | } |
| 166 | cmd.Env = append(cmd.Env, os.Environ()...) |
| 167 | pipes, err := getPipes(cmd, opts) |
| 168 | if err != nil { |
| 169 | return nil, err |
| 170 | } |
| 171 | if err = errors.Convert(cmd.Start()); err != nil { |
| 172 | return nil, err |
| 173 | } |
| 174 | receiveStream := make(chan *ProcessResponse, 32) |
| 175 | wg := &sync.WaitGroup{} |
| 176 | stdScanner := scanOutputPipe(pipes.stdout, wg, opts.OnStdout, func(result []byte) *ProcessResponse { |
| 177 | return &ProcessResponse{stdout: result} |
| 178 | }, receiveStream) |
| 179 | errScanner, remoteErrorMsg := scanErrorPipe(pipes.stderr, opts.OnStderr, receiveStream) |
| 180 | fdOutScanner := scanOutputPipe(pipes.fdOut, wg, opts.OnFdOut, func(result []byte) *ProcessResponse { |
| 181 | return &ProcessResponse{fdOut: result} |
| 182 | }, receiveStream) |
| 183 | wg.Add(2) |
| 184 | if pipes.fdOut != nil { |
| 185 | wg.Add(1) |
| 186 | } |
| 187 | go stdScanner() |
| 188 | go errScanner() |
| 189 | if pipes.fdOut != nil { |
| 190 | go fdOutScanner() |
| 191 | } |
| 192 | processStream := &ProcessStream{ |
| 193 | process: cmd.Process, |
| 194 | receiveChannel: receiveStream, |
| 195 | } |
| 196 | go func() { |
| 197 | defer pipes.close() |
| 198 | if err = errors.Convert(cmd.Wait()); err != nil { |
| 199 | if !processStream.cancelled { |
| 200 | receiveStream <- &ProcessResponse{err: errors.Default.Wrap(err, fmt.Sprintf("remote error response:\n%s", remoteErrorMsg))} |
| 201 | } |
| 202 | } |
| 203 | wg.Done() |
| 204 | }() |
| 205 | go func() { |
| 206 | defer close(receiveStream) |
| 207 | wg.Wait() |
| 208 | }() |
| 209 | return processStream, nil |
| 210 | } |
| 211 | |
| 212 | func getPipes(cmd *exec.Cmd, opts *StreamProcessOptions) (*processPipes, errors.Error) { |
| 213 | stdout, err := cmd.StdoutPipe() |
no test coverage detected