recordV2 is the FakeConn-based record path. The relay owns and writes the real sockets; recordV2 only observes the teed chunks via sess.ClientStream sess.DestStream and emits mocks with timestamps derived from the chunks' ReadAt / WrittenAt fields (never from time.Now()). The mock payload (headers,
(ctx context.Context, sess *supervisor.Session)
| 40 | // case it marks the session's mock incomplete so the supervisor can |
| 41 | // abort and fall through to passthrough). |
| 42 | func (h *HTTP) recordV2(ctx context.Context, sess *supervisor.Session) error { |
| 43 | if sess == nil { |
| 44 | return errors.New("recordV2: nil supervisor session") |
| 45 | } |
| 46 | logger := sess.Logger |
| 47 | if logger == nil { |
| 48 | logger = h.Logger |
| 49 | } |
| 50 | |
| 51 | destPort := destPortFromAddr(sess.DestStream) |
| 52 | |
| 53 | for { |
| 54 | if err := ctx.Err(); err != nil { |
| 55 | return err |
| 56 | } |
| 57 | |
| 58 | // --- Request side --------------------------------------------- |
| 59 | // |
| 60 | // First chunk's ReadAt is the request arrival timestamp. We grab |
| 61 | // it via ReadChunk so the timestamp is carried regardless of |
| 62 | // what ReadBytes does underneath. |
| 63 | firstChunk, err := sess.ClientStream.ReadChunk() |
| 64 | if err != nil { |
| 65 | if errors.Is(err, io.EOF) || errors.Is(err, fakeconn.ErrClosed) { |
| 66 | logger.Debug("V2 HTTP record: client stream ended at start of request", zap.Error(err)) |
| 67 | return nil |
| 68 | } |
| 69 | if ctx.Err() != nil { |
| 70 | return ctx.Err() |
| 71 | } |
| 72 | utils.LogError(logger, err, "V2 HTTP record: initial request read failed") |
| 73 | return err |
| 74 | } |
| 75 | if len(firstChunk.Bytes) == 0 { |
| 76 | // Empty synthetic chunk (channel close sentinel): treat as EOF. |
| 77 | return nil |
| 78 | } |
| 79 | reqTs := firstChunk.ReadAt |
| 80 | finalReq := append([]byte(nil), firstChunk.Bytes...) |
| 81 | |
| 82 | // Complete the request: read more chunks until headers + body |
| 83 | // are on hand. We use chunk-level reads (not HandleChunkedRequests |
| 84 | // which wraps bufio-like bulk reads over the conn) so we never |
| 85 | // over-read past the end of one request into the start of the |
| 86 | // next pipelined request. This is essential for HTTP/1.1 |
| 87 | // keepalive correctness. |
| 88 | if err := h.readRequestV2(ctx, sess.ClientStream, &finalReq); err != nil { |
| 89 | if errors.Is(err, io.EOF) || errors.Is(err, fakeconn.ErrClosed) { |
| 90 | logger.Debug("V2 HTTP record: client stream closed mid-request", zap.Error(err)) |
| 91 | return nil |
| 92 | } |
| 93 | if ctx.Err() != nil { |
| 94 | return ctx.Err() |
| 95 | } |
| 96 | sess.MarkMockIncomplete("http decode error: request read failed: " + err.Error()) |
| 97 | utils.LogError(logger, err, "V2 HTTP record: failed to read full request") |
| 98 | return err |
| 99 | } |