MCPcopy
hub / github.com/keploy/keploy / encodeGenericV2

Function encodeGenericV2

pkg/agent/proxy/integrations/generic/encode_v2.go:37–242  ·  view source on GitHub ↗

encodeGenericV2 implements the V2 record path for the generic parser. It drains the two FakeConn streams concurrently and pairs chunks into mocks with the same shape the legacy encodeGeneric path produces so that replay works against mocks recorded by either path.

(sess *supervisor.Session, logger *zap.Logger)

Source from the content-addressed store, hash-verified

35// mocks with the same shape the legacy encodeGeneric path produces so
36// that replay works against mocks recorded by either path.
37func encodeGenericV2(sess *supervisor.Session, logger *zap.Logger) error {
38 if sess.ClientStream == nil || sess.DestStream == nil {
39 // Defensive: a session without streams can happen only if the
40 // supervisor is misconfigured, but returning nil is safer than
41 // panicking on a nil receiver deep inside a goroutine.
42 if logger != nil {
43 logger.Debug("generic v2: session missing streams, skipping")
44 }
45 return nil
46 }
47
48 // The generic parser consumes exchanges as "one or more client
49 // chunks followed by one or more dest chunks." The relay only
50 // surfaces a dest chunk AFTER the client chunk that caused it has
51 // been forwarded (causality), so in production the first event on
52 // ClientStream strictly precedes anything on DestStream. We match
53 // that invariant here by reading the initial client chunk on the
54 // calling goroutine before starting the concurrent reader pair —
55 // otherwise a race between the two reader goroutines could observe
56 // a dest event before its paired client event on synthetic inputs
57 // where both streams are pre-primed.
58 events := make(chan chunkEvent, 16)
59
60 initial, err := sess.ClientStream.ReadChunk()
61 if err != nil {
62 // No client bytes ever arrived: nothing to mock.
63 if logger != nil && !isBenignReadErr(err) {
64 logger.Debug("generic v2: initial client read failed",
65 zap.Error(err))
66 }
67 return nil
68 }
69
70 // Seed the events channel with the initial client chunk so the
71 // main loop's state machine treats it like any other.
72 if len(initial.Bytes) > 0 {
73 events <- chunkEvent{
74 dir: fakeconn.FromClient,
75 bytes: initial.Bytes,
76 readAt: initial.ReadAt,
77 writtenAt: initial.WrittenAt,
78 }
79 }
80
81 // Reader goroutines: one per direction. Each loop reads chunks from
82 // its FakeConn and forwards them onto the shared events channel
83 // until EOF / ErrClosed. We count their exits via a WaitGroup-style
84 // pattern (two reads off readerDone) and close the events channel
85 // only after both have exited so the main loop never observes a
86 // "close before drain" race.
87 readerDone := make(chan struct{}, 2)
88 go readStream(sess.ClientStream, fakeconn.FromClient, events, readerDone)
89 go readStream(sess.DestStream, fakeconn.FromDest, events, readerDone)
90
91 // closer goroutine: waits for both readers to return, then closes
92 // events so the main-loop range exits exactly once. Without this,
93 // the main loop would need its own termination condition based on
94 // per-side done flags, which races against in-flight data events

Callers 1

recordV2Method · 0.85

Calls 11

GetVersionFunction · 0.92
isBenignReadErrFunction · 0.85
readStreamFunction · 0.85
encodePayloadFunction · 0.85
ReadChunkMethod · 0.80
IsMockIncompleteMethod · 0.80
MarkMockCompleteMethod · 0.80
EmitMockMethod · 0.80
DebugMethod · 0.65
ErrorMethod · 0.45
StringMethod · 0.45

Tested by

no test coverage detected