MCPcopy
hub / github.com/keploy/keploy / compareNDJSONStream

Function compareNDJSONStream

pkg/util.go:1178–1246  ·  view source on GitHub ↗
(expectedResp models.HTTPResp, stream io.Reader, jsonNoiseKeys map[string]struct{}, logger *zap.Logger)

Source from the content-addressed store, hash-verified

1176}
1177
1178func compareNDJSONStream(expectedResp models.HTTPResp, stream io.Reader, jsonNoiseKeys map[string]struct{}, logger *zap.Logger) (bool, string, *StreamMismatchInfo, error) {
1179 expectedQueue := extractExpectedRawQueue(expectedResp, canonicalizeNDJSONLine, true)
1180 actualQueue := make([]string, 0, len(expectedQueue))
1181 nextExpected := 0
1182
1183 scanner := bufio.NewScanner(stream)
1184 scanner.Buffer(make([]byte, 0, 64*1024), maxStreamTokenSize)
1185
1186 for scanner.Scan() {
1187 line := canonicalizeNDJSONLine(scanner.Text())
1188 if line == "" {
1189 continue
1190 }
1191
1192 if nextExpected >= len(expectedQueue) {
1193 logger.Debug("received additional NDJSON data after expected stream was fully matched; closing stream capture",
1194 zap.Int("expected_frames", len(expectedQueue)))
1195 break
1196 }
1197
1198 actualQueue = append(actualQueue, line)
1199 expected := expectedQueue[nextExpected]
1200 ok, cmpErr := compareJSONTextWithNoise(expected, line, jsonNoiseKeys)
1201 if cmpErr != nil || !ok {
1202 reason := "json mismatch"
1203 if cmpErr != nil {
1204 reason = cmpErr.Error()
1205 }
1206 logger.Debug("NDJSON stream mismatch",
1207 zap.Int("frame_index", nextExpected),
1208 zap.String("reason", reason),
1209 zap.String("expected_frame", expected),
1210 zap.String("actual_frame", line))
1211 mismatchInfo := &StreamMismatchInfo{
1212 FrameIndex: nextExpected,
1213 ExpectedFrame: expected,
1214 ActualFrame: line,
1215 Reason: reason,
1216 }
1217 return false, strings.Join(actualQueue, "\n"), mismatchInfo, nil
1218 }
1219
1220 nextExpected++
1221 if nextExpected == len(expectedQueue) {
1222 logger.Debug("all expected NDJSON frames matched; closing stream capture early to avoid waiting for extra stream events",
1223 zap.Int("matched_frames", nextExpected))
1224 break
1225 }
1226 }
1227
1228 if err := scanner.Err(); err != nil {
1229 return false, strings.Join(actualQueue, "\n"), nil, err
1230 }
1231
1232 if nextExpected < len(expectedQueue) {
1233 logger.Debug("NDJSON stream ended before all expected frames were received",
1234 zap.Int("expected_frames", len(expectedQueue)),
1235 zap.Int("matched_frames", nextExpected))

Callers 1

CompareHTTPStreamFunction · 0.85

Calls 6

extractExpectedRawQueueFunction · 0.85
canonicalizeNDJSONLineFunction · 0.85
compareJSONTextWithNoiseFunction · 0.85
DebugMethod · 0.65
ErrorMethod · 0.45
StringMethod · 0.45

Tested by

no test coverage detected