(expectedResp models.HTTPResp, stream io.Reader, jsonNoiseKeys map[string]struct{}, logger *zap.Logger)
| 1176 | } |
| 1177 | |
| 1178 | func compareNDJSONStream(expectedResp models.HTTPResp, stream io.Reader, jsonNoiseKeys map[string]struct{}, logger *zap.Logger) (bool, string, *StreamMismatchInfo, error) { |
| 1179 | expectedQueue := extractExpectedRawQueue(expectedResp, canonicalizeNDJSONLine, true) |
| 1180 | actualQueue := make([]string, 0, len(expectedQueue)) |
| 1181 | nextExpected := 0 |
| 1182 | |
| 1183 | scanner := bufio.NewScanner(stream) |
| 1184 | scanner.Buffer(make([]byte, 0, 64*1024), maxStreamTokenSize) |
| 1185 | |
| 1186 | for scanner.Scan() { |
| 1187 | line := canonicalizeNDJSONLine(scanner.Text()) |
| 1188 | if line == "" { |
| 1189 | continue |
| 1190 | } |
| 1191 | |
| 1192 | if nextExpected >= len(expectedQueue) { |
| 1193 | logger.Debug("received additional NDJSON data after expected stream was fully matched; closing stream capture", |
| 1194 | zap.Int("expected_frames", len(expectedQueue))) |
| 1195 | break |
| 1196 | } |
| 1197 | |
| 1198 | actualQueue = append(actualQueue, line) |
| 1199 | expected := expectedQueue[nextExpected] |
| 1200 | ok, cmpErr := compareJSONTextWithNoise(expected, line, jsonNoiseKeys) |
| 1201 | if cmpErr != nil || !ok { |
| 1202 | reason := "json mismatch" |
| 1203 | if cmpErr != nil { |
| 1204 | reason = cmpErr.Error() |
| 1205 | } |
| 1206 | logger.Debug("NDJSON stream mismatch", |
| 1207 | zap.Int("frame_index", nextExpected), |
| 1208 | zap.String("reason", reason), |
| 1209 | zap.String("expected_frame", expected), |
| 1210 | zap.String("actual_frame", line)) |
| 1211 | mismatchInfo := &StreamMismatchInfo{ |
| 1212 | FrameIndex: nextExpected, |
| 1213 | ExpectedFrame: expected, |
| 1214 | ActualFrame: line, |
| 1215 | Reason: reason, |
| 1216 | } |
| 1217 | return false, strings.Join(actualQueue, "\n"), mismatchInfo, nil |
| 1218 | } |
| 1219 | |
| 1220 | nextExpected++ |
| 1221 | if nextExpected == len(expectedQueue) { |
| 1222 | logger.Debug("all expected NDJSON frames matched; closing stream capture early to avoid waiting for extra stream events", |
| 1223 | zap.Int("matched_frames", nextExpected)) |
| 1224 | break |
| 1225 | } |
| 1226 | } |
| 1227 | |
| 1228 | if err := scanner.Err(); err != nil { |
| 1229 | return false, strings.Join(actualQueue, "\n"), nil, err |
| 1230 | } |
| 1231 | |
| 1232 | if nextExpected < len(expectedQueue) { |
| 1233 | logger.Debug("NDJSON stream ended before all expected frames were received", |
| 1234 | zap.Int("expected_frames", len(expectedQueue)), |
| 1235 | zap.Int("matched_frames", nextExpected)) |
no test coverage detected