MCPcopy
hub / github.com/keploy/keploy / handleHttp1ZeroCopy

Method handleHttp1ZeroCopy

pkg/agent/proxy/incoming/http.go:1042–1593  ·  view source on GitHub ↗

handleHttp1ZeroCopy handles HTTP/1.x connections in normal (non-sync) and sampling modes. Architecture (Option 3 + MSG_PEEK + replay-on-stale): the function frames HTTP/1.1 requests on the downstream so it can manage upstream connection lifetime per request. Before reusing a pooled upstream conn it

(ctx context.Context, clientConn net.Conn, upConn net.Conn, logger *zap.Logger, t chan *models.TestCase, appPort uint16, forceClose bool, releaseSlot func())

Source from the content-addressed store, hash-verified

1040// connection lifetime for normal mode and sampling-bypass; only upstream
1041// conns get redialed on stale detect.
1042func (pm *IngressProxyManager) handleHttp1ZeroCopy(ctx context.Context, clientConn net.Conn, upConn net.Conn, logger *zap.Logger, t chan *models.TestCase, appPort uint16, forceClose bool, releaseSlot func()) {
1043 logger.Debug("Using request-framed HTTP/1.1 proxy with MSG_PEEK pool liveness")
1044
1045 upstreamAddr := upConn.RemoteAddr().String()
1046 // Capture the network family of the original conn so redial uses
1047 // the same one — hard-coding "tcp4" would break against an IPv6
1048 // upstream and could mismatch a "tcp" dual-stack listener.
1049 upstreamNetwork := upConn.RemoteAddr().Network()
1050
1051 // upConnHolder lets the ctx-cancel + function-exit goroutines safely
1052 // close whatever the CURRENT upstream conn is, without racing with
1053 // redial()'s reassignment. The main loop still uses the local upConn
1054 // variable for fast access (no atomic load on the hot path); redial
1055 // is responsible for keeping the holder in sync.
1056 type connHolder struct{ c net.Conn }
1057 var upConnHolder atomic.Pointer[connHolder]
1058 upConnHolder.Store(&connHolder{c: upConn})
1059
1060 // On agent shutdown, close BOTH conns: closing the downstream
1061 // unblocks http.ReadRequest in the loop's read phase; closing the
1062 // upstream unblocks http.ReadResponse / req.Write if the loop is
1063 // stuck in upstream I/O on a hung backend. Race-safe via the
1064 // atomic — we always close whatever connHolder pointer is current,
1065 // even if redial swapped it concurrently.
1066 go func() {
1067 <-ctx.Done()
1068 _ = clientConn.Close()
1069 if h := upConnHolder.Load(); h != nil {
1070 _ = h.c.Close()
1071 }
1072 }()
1073
1074 wireConn := &wireTimeConn{Conn: clientConn}
1075 clientReader := bufio.NewReader(wireConn)
1076 upstreamReader := bufio.NewReader(upConn)
1077 // t == nil → caller (sampling-bypass path) wants MSG_PEEK protection
1078 // without capture. Suppress capture for this connection's lifetime.
1079 captureEnabled := t != nil && !isIngressRecordingPaused()
1080
1081 // forceCloseActive mirrors the caller's forceClose intent but can be
1082 // flipped to false mid-connection by demoteToBypass when the exchange
1083 // turns out to be ineligible for the sampling budget (chunked /
1084 // unknown-length body, or mid-stream memory pressure). Once demoted,
1085 // the connection becomes pure passthrough: no Connection: close
1086 // injection on subsequent iterations, the sampling slot is released,
1087 // and capture is disabled — matching the "untracked + unchanged" rule
1088 // for non-capturable exchanges. The flag never re-enables itself.
1089 forceCloseActive := forceClose
1090 demoteToBypass := func() {
1091 if !forceCloseActive {
1092 return
1093 }
1094 forceCloseActive = false
1095 captureEnabled = false
1096 if releaseSlot != nil {
1097 releaseSlot()
1098 }
1099 }

Callers 1

handleHttp1ConnectionMethod · 0.95

Calls 15

LastReadTimeMethod · 0.95
TruncatedMethod · 0.95
BytesMethod · 0.95
loadIncomingOptsMethod · 0.95
ParseHTTPRequestFunction · 0.92
ParseHTTPResponseFunction · 0.92
writeBadRequestFunction · 0.85
isHTTPUpgradeFunction · 0.85
isChunkedFunction · 0.85
writeBadGatewayFunction · 0.85
forwardRawTCPFunction · 0.85
isIdempotentMethodFunction · 0.85

Tested by

no test coverage detected