encodeGenericV2 implements the V2 record path for the generic parser. It drains the two FakeConn streams concurrently and pairs chunks into mocks with the same shape the legacy encodeGeneric path produces so that replay works against mocks recorded by either path.
(sess *supervisor.Session, logger *zap.Logger)
| 35 | // mocks with the same shape the legacy encodeGeneric path produces so |
| 36 | // that replay works against mocks recorded by either path. |
| 37 | func encodeGenericV2(sess *supervisor.Session, logger *zap.Logger) error { |
| 38 | if sess.ClientStream == nil || sess.DestStream == nil { |
| 39 | // Defensive: a session without streams can happen only if the |
| 40 | // supervisor is misconfigured, but returning nil is safer than |
| 41 | // panicking on a nil receiver deep inside a goroutine. |
| 42 | if logger != nil { |
| 43 | logger.Debug("generic v2: session missing streams, skipping") |
| 44 | } |
| 45 | return nil |
| 46 | } |
| 47 | |
| 48 | // The generic parser consumes exchanges as "one or more client |
| 49 | // chunks followed by one or more dest chunks." The relay only |
| 50 | // surfaces a dest chunk AFTER the client chunk that caused it has |
| 51 | // been forwarded (causality), so in production the first event on |
| 52 | // ClientStream strictly precedes anything on DestStream. We match |
| 53 | // that invariant here by reading the initial client chunk on the |
| 54 | // calling goroutine before starting the concurrent reader pair — |
| 55 | // otherwise a race between the two reader goroutines could observe |
| 56 | // a dest event before its paired client event on synthetic inputs |
| 57 | // where both streams are pre-primed. |
| 58 | events := make(chan chunkEvent, 16) |
| 59 | |
| 60 | initial, err := sess.ClientStream.ReadChunk() |
| 61 | if err != nil { |
| 62 | // No client bytes ever arrived: nothing to mock. |
| 63 | if logger != nil && !isBenignReadErr(err) { |
| 64 | logger.Debug("generic v2: initial client read failed", |
| 65 | zap.Error(err)) |
| 66 | } |
| 67 | return nil |
| 68 | } |
| 69 | |
| 70 | // Seed the events channel with the initial client chunk so the |
| 71 | // main loop's state machine treats it like any other. |
| 72 | if len(initial.Bytes) > 0 { |
| 73 | events <- chunkEvent{ |
| 74 | dir: fakeconn.FromClient, |
| 75 | bytes: initial.Bytes, |
| 76 | readAt: initial.ReadAt, |
| 77 | writtenAt: initial.WrittenAt, |
| 78 | } |
| 79 | } |
| 80 | |
| 81 | // Reader goroutines: one per direction. Each loop reads chunks from |
| 82 | // its FakeConn and forwards them onto the shared events channel |
| 83 | // until EOF / ErrClosed. We count their exits via a WaitGroup-style |
| 84 | // pattern (two reads off readerDone) and close the events channel |
| 85 | // only after both have exited so the main loop never observes a |
| 86 | // "close before drain" race. |
| 87 | readerDone := make(chan struct{}, 2) |
| 88 | go readStream(sess.ClientStream, fakeconn.FromClient, events, readerDone) |
| 89 | go readStream(sess.DestStream, fakeconn.FromDest, events, readerDone) |
| 90 | |
| 91 | // closer goroutine: waits for both readers to return, then closes |
| 92 | // events so the main-loop range exits exactly once. Without this, |
| 93 | // the main loop would need its own termination condition based on |
| 94 | // per-side done flags, which races against in-flight data events |
no test coverage detected