| 28 | } |
| 29 | |
| 30 | func (p *HTTPProtocol) Dispatch(ctx context.Context, t task.Request) error { |
| 31 | var retErr error |
| 32 | done := make(chan struct{}) |
| 33 | go func() { |
| 34 | var body bytes.Buffer |
| 35 | io.Copy(&body, t.Config.Stdin) |
| 36 | req, err := http.NewRequest("GET", "/", &body) |
| 37 | if err != nil { |
| 38 | retErr = err |
| 39 | return |
| 40 | } |
| 41 | for k, v := range t.Config.Env { |
| 42 | req.Header.Set(k, v) |
| 43 | } |
| 44 | req.Header.Set("Content-Length", fmt.Sprint(body.Len())) |
| 45 | req.Header.Set("Task-ID", t.Config.ID) |
| 46 | raw, err := httputil.DumpRequest(req, true) |
| 47 | if err != nil { |
| 48 | retErr = err |
| 49 | return |
| 50 | } |
| 51 | p.in.Write(raw) |
| 52 | |
| 53 | res, err := http.ReadResponse(bufio.NewReader(p.out), req) |
| 54 | if err != nil { |
| 55 | retErr = err |
| 56 | return |
| 57 | } |
| 58 | |
| 59 | io.Copy(t.Config.Stdout, res.Body) |
| 60 | done <- struct{}{} |
| 61 | }() |
| 62 | |
| 63 | timeout := time.After(t.Config.Timeout) |
| 64 | |
| 65 | select { |
| 66 | case <-ctx.Done(): |
| 67 | return ctx.Err() |
| 68 | case <-timeout: |
| 69 | return models.ErrRunnerTimeout |
| 70 | case <-done: |
| 71 | return retErr |
| 72 | } |
| 73 | } |