handleHttp1ZeroCopy handles HTTP/1.x connections in normal (non-sync) and sampling modes. Architecture (Option 3 + MSG_PEEK + replay-on-stale): the function frames HTTP/1.1 requests on the downstream so it can manage upstream connection lifetime per request. Before reusing a pooled upstream conn it
(ctx context.Context, clientConn net.Conn, upConn net.Conn, logger *zap.Logger, t chan *models.TestCase, appPort uint16, forceClose bool, releaseSlot func())
| 1040 | // connection lifetime for normal mode and sampling-bypass; only upstream |
| 1041 | // conns get redialed on stale detect. |
| 1042 | func (pm *IngressProxyManager) handleHttp1ZeroCopy(ctx context.Context, clientConn net.Conn, upConn net.Conn, logger *zap.Logger, t chan *models.TestCase, appPort uint16, forceClose bool, releaseSlot func()) { |
| 1043 | logger.Debug("Using request-framed HTTP/1.1 proxy with MSG_PEEK pool liveness") |
| 1044 | |
| 1045 | upstreamAddr := upConn.RemoteAddr().String() |
| 1046 | // Capture the network family of the original conn so redial uses |
| 1047 | // the same one — hard-coding "tcp4" would break against an IPv6 |
| 1048 | // upstream and could mismatch a "tcp" dual-stack listener. |
| 1049 | upstreamNetwork := upConn.RemoteAddr().Network() |
| 1050 | |
| 1051 | // upConnHolder lets the ctx-cancel + function-exit goroutines safely |
| 1052 | // close whatever the CURRENT upstream conn is, without racing with |
| 1053 | // redial()'s reassignment. The main loop still uses the local upConn |
| 1054 | // variable for fast access (no atomic load on the hot path); redial |
| 1055 | // is responsible for keeping the holder in sync. |
| 1056 | type connHolder struct{ c net.Conn } |
| 1057 | var upConnHolder atomic.Pointer[connHolder] |
| 1058 | upConnHolder.Store(&connHolder{c: upConn}) |
| 1059 | |
| 1060 | // On agent shutdown, close BOTH conns: closing the downstream |
| 1061 | // unblocks http.ReadRequest in the loop's read phase; closing the |
| 1062 | // upstream unblocks http.ReadResponse / req.Write if the loop is |
| 1063 | // stuck in upstream I/O on a hung backend. Race-safe via the |
| 1064 | // atomic — we always close whatever connHolder pointer is current, |
| 1065 | // even if redial swapped it concurrently. |
| 1066 | go func() { |
| 1067 | <-ctx.Done() |
| 1068 | _ = clientConn.Close() |
| 1069 | if h := upConnHolder.Load(); h != nil { |
| 1070 | _ = h.c.Close() |
| 1071 | } |
| 1072 | }() |
| 1073 | |
| 1074 | wireConn := &wireTimeConn{Conn: clientConn} |
| 1075 | clientReader := bufio.NewReader(wireConn) |
| 1076 | upstreamReader := bufio.NewReader(upConn) |
| 1077 | // t == nil → caller (sampling-bypass path) wants MSG_PEEK protection |
| 1078 | // without capture. Suppress capture for this connection's lifetime. |
| 1079 | captureEnabled := t != nil && !isIngressRecordingPaused() |
| 1080 | |
| 1081 | // forceCloseActive mirrors the caller's forceClose intent but can be |
| 1082 | // flipped to false mid-connection by demoteToBypass when the exchange |
| 1083 | // turns out to be ineligible for the sampling budget (chunked / |
| 1084 | // unknown-length body, or mid-stream memory pressure). Once demoted, |
| 1085 | // the connection becomes pure passthrough: no Connection: close |
| 1086 | // injection on subsequent iterations, the sampling slot is released, |
| 1087 | // and capture is disabled — matching the "untracked + unchanged" rule |
| 1088 | // for non-capturable exchanges. The flag never re-enables itself. |
| 1089 | forceCloseActive := forceClose |
| 1090 | demoteToBypass := func() { |
| 1091 | if !forceCloseActive { |
| 1092 | return |
| 1093 | } |
| 1094 | forceCloseActive = false |
| 1095 | captureEnabled = false |
| 1096 | if releaseSlot != nil { |
| 1097 | releaseSlot() |
| 1098 | } |
| 1099 | } |
no test coverage detected