| 66 | } |
| 67 | |
| 68 | func (a *AgentClient) GetIncoming(ctx context.Context, opts models.IncomingOptions) (<-chan *models.TestCase, error) { |
| 69 | |
| 70 | a.logger.Debug("Connecting to incoming test cases stream...") |
| 71 | |
| 72 | requestBody := models.IncomingReq{ |
| 73 | IncomingOptions: opts, |
| 74 | } |
| 75 | |
| 76 | requestJSON, err := json.Marshal(requestBody) |
| 77 | if err != nil { |
| 78 | utils.LogError(a.logger, err, "failed to marshal request body for incoming request") |
| 79 | return nil, fmt.Errorf("error marshaling request body for incoming request: %s", err.Error()) |
| 80 | } |
| 81 | |
| 82 | req, err := http.NewRequestWithContext(ctx, "POST", fmt.Sprintf("%s/incoming", a.conf.Agent.AgentURI), bytes.NewBuffer(requestJSON)) |
| 83 | if err != nil { |
| 84 | utils.LogError(a.logger, err, "failed to create request for incoming request") |
| 85 | return nil, fmt.Errorf("error creating request for incoming request: %s", err.Error()) |
| 86 | } |
| 87 | req.Header.Set("Content-Type", "application/json") |
| 88 | |
| 89 | // Make the HTTP request |
| 90 | res, err := a.client.Do(req) |
| 91 | if err != nil { |
| 92 | return nil, fmt.Errorf("failed to get incoming: %s", err.Error()) |
| 93 | } |
| 94 | |
| 95 | // Create a channel to stream TestCase data |
| 96 | tcChan := make(chan *models.TestCase) |
| 97 | |
| 98 | // Determine stream type |
| 99 | contentType := res.Header.Get("Content-Type") |
| 100 | mediaType, params, err := mime.ParseMediaType(contentType) |
| 101 | if err != nil { |
| 102 | utils.LogError(a.logger, err, "failed to parse content type", zap.String("content-type", contentType)) |
| 103 | } |
| 104 | |
| 105 | if strings.HasPrefix(mediaType, "multipart/") { |
| 106 | boundary := strings.TrimSpace(params["boundary"]) |
| 107 | if boundary == "" { |
| 108 | if res.Body != nil { |
| 109 | res.Body.Close() |
| 110 | } |
| 111 | return nil, fmt.Errorf("missing multipart boundary in content-type: %s", contentType) |
| 112 | } |
| 113 | go func() { |
| 114 | defer func() { |
| 115 | close(tcChan) |
| 116 | if res.Body != nil { |
| 117 | res.Body.Close() |
| 118 | } |
| 119 | }() |
| 120 | |
| 121 | mr := multipart.NewReader(res.Body, boundary) |
| 122 | var pendingTestCase *models.TestCase |
| 123 | |
| 124 | for { |
| 125 | part, err := mr.NextPart() |