SimulateHTTPStreaming sends an HTTP request and returns the streaming response with a reader. The caller is responsible for reading from the response Reader and closing it when done. Use CompareHTTPStream to compare the streaming response with expected values.
(ctx context.Context, tc *models.TestCase, testSet string, logger *zap.Logger, cfg SimulationConfig)
| 782 | // The caller is responsible for reading from the response Reader and closing it when done. |
| 783 | // Use CompareHTTPStream to compare the streaming response with expected values. |
| 784 | func SimulateHTTPStreaming(ctx context.Context, tc *models.TestCase, testSet string, logger *zap.Logger, cfg SimulationConfig) (*StreamingHTTPResponse, error) { |
| 785 | logger.Info("starting streaming test for", zap.Any("test case", models.HighlightString(tc.Name)), zap.Any("test set", models.HighlightString(testSet))) |
| 786 | |
| 787 | // Calculate streaming timeout |
| 788 | APITimeout := ComputeStreamingTimeoutSeconds(tc, cfg.APITimeout) |
| 789 | cfg.RequestTimeout = time.Second * time.Duration(APITimeout) |
| 790 | |
| 791 | // Prepare the HTTP request using the shared helper |
| 792 | prepared, err := prepareHTTPRequest(ctx, tc, testSet, logger, cfg) |
| 793 | if err != nil { |
| 794 | return nil, err |
| 795 | } |
| 796 | |
| 797 | logger.Debug(fmt.Sprintf("Sending streaming request to user app:%v", prepared.Request)) |
| 798 | |
| 799 | // Execute the request (re-sending only on a pre-response connection-refused) |
| 800 | httpResp, errHTTPReq := doRequestWithConnRefusedRetry(ctx, logger, prepared.Client, prepared.Request) |
| 801 | if errHTTPReq != nil { |
| 802 | utils.LogError(logger, errHTTPReq, "failed to send testcase request to app") |
| 803 | return nil, errHTTPReq |
| 804 | } |
| 805 | |
| 806 | statusMessage := http.StatusText(httpResp.StatusCode) |
| 807 | streamCfg := DetectHTTPStreamConfig(tc, httpResp) |
| 808 | |
| 809 | logger.Debug("detected HTTP streaming response", |
| 810 | zap.String("testcase", tc.Name), |
| 811 | zap.String("content_type", httpResp.Header.Get("Content-Type")), |
| 812 | zap.String("stream_mode", string(streamCfg.Mode))) |
| 813 | |
| 814 | // Wrap the response body with decompression if needed |
| 815 | streamReader := io.ReadCloser(httpResp.Body) |
| 816 | contentEncoding := strings.ToLower(strings.TrimSpace(httpResp.Header.Get("Content-Encoding"))) |
| 817 | switch contentEncoding { |
| 818 | case "gzip": |
| 819 | gzipReader, gzErr := gzip.NewReader(httpResp.Body) |
| 820 | if gzErr != nil { |
| 821 | httpResp.Body.Close() |
| 822 | utils.LogError(logger, gzErr, "failed to create gzip reader for streaming response") |
| 823 | return nil, gzErr |
| 824 | } |
| 825 | streamReader = &gzipReadCloser{gzipReader: gzipReader, underlying: httpResp.Body} |
| 826 | case "br": |
| 827 | streamReader = &brotliReadCloser{reader: brotli.NewReader(httpResp.Body), underlying: httpResp.Body} |
| 828 | case "": |
| 829 | // no-op, use httpResp.Body directly |
| 830 | default: |
| 831 | logger.Debug("unsupported content-encoding for stream; returning raw response body", |
| 832 | zap.String("content_encoding", contentEncoding)) |
| 833 | } |
| 834 | |
| 835 | return &StreamingHTTPResponse{ |
| 836 | StatusCode: httpResp.StatusCode, |
| 837 | StatusMessage: statusMessage, |
| 838 | Header: ToYamlHTTPHeader(httpResp.Header), |
| 839 | Reader: streamReader, |
| 840 | StreamConfig: streamCfg, |
| 841 | }, nil |