Start starts the commands in the pipeline. If `Start()` exits without an error, `Wait()` must also be called, to allow all resources to be freed.
(ctx context.Context)
| 122 | // without an error, `Wait()` must also be called, to allow all |
| 123 | // resources to be freed. |
| 124 | func (p *Pipeline) Start(ctx context.Context) error { |
| 125 | if p.hasStarted() { |
| 126 | panic("attempt to start a pipeline that has already started") |
| 127 | } |
| 128 | |
| 129 | atomic.StoreUint32(&p.started, 1) |
| 130 | ctx, p.cancel = context.WithCancel(ctx) |
| 131 | |
| 132 | var nextStdin io.ReadCloser |
| 133 | if p.stdin != nil { |
| 134 | // We don't want the first stage to actually close this, and |
| 135 | // it's not even an `io.ReadCloser`, so fake it: |
| 136 | nextStdin = io.NopCloser(p.stdin) |
| 137 | } |
| 138 | |
| 139 | for i, s := range p.stages { |
| 140 | var err error |
| 141 | stdout, err := s.Start(ctx, p.env, nextStdin) |
| 142 | if err != nil { |
| 143 | // Close the pipe that the previous stage was writing to. |
| 144 | // That should cause it to exit even if it's not minding |
| 145 | // its context. |
| 146 | if nextStdin != nil { |
| 147 | _ = nextStdin.Close() |
| 148 | } |
| 149 | |
| 150 | // Kill and wait for any stages that have been started |
| 151 | // already to finish: |
| 152 | p.cancel() |
| 153 | for _, s := range p.stages[:i] { |
| 154 | _ = s.Wait() |
| 155 | } |
| 156 | return fmt.Errorf("starting pipeline stage %q: %w", s.Name(), err) |
| 157 | } |
| 158 | nextStdin = stdout |
| 159 | } |
| 160 | |
| 161 | // If the pipeline was configured with a `stdout`, add a synthetic |
| 162 | // stage to copy the last stage's stdout to that writer: |
| 163 | if p.stdout != nil { |
| 164 | c := newIOCopier(p.stdout) |
| 165 | p.stages = append(p.stages, c) |
| 166 | // `ioCopier.Start()` never fails: |
| 167 | _, _ = c.Start(ctx, p.env, nextStdin) |
| 168 | } |
| 169 | |
| 170 | return nil |
| 171 | } |
| 172 | |
| 173 | func (p *Pipeline) Output(ctx context.Context) ([]byte, error) { |
| 174 | var buf bytes.Buffer |
no test coverage detected