MCPcopy Index your code
hub / github.com/keploy/keploy / encodeHTTP

Method encodeHTTP

pkg/agent/proxy/integrations/http/encode.go:64–437  ·  view source on GitHub ↗

encodeHTTP records outgoing HTTP traffic. The read/forward/chunked-handling logic is identical to the original synchronous implementation. The only difference is that parseFinalHTTP (HTTP parsing, body decompression, mock creation) is offloaded to a background goroutine so it never blocks the forwar

(ctx context.Context, reqBuf []byte, clientConn, destConn net.Conn, mocks chan<- *models.Mock, opts models.OutgoingOptions, onMockRecorded integrations.PostRecordHook)

Source from the content-addressed store, hash-verified

62// creation) is offloaded to a background goroutine so it never blocks the
63// forwarding path.
64func (h *HTTP) encodeHTTP(ctx context.Context, reqBuf []byte, clientConn, destConn net.Conn, mocks chan<- *models.Mock, opts models.OutgoingOptions, onMockRecorded integrations.PostRecordHook) error {
65 remoteAddr := destConn.RemoteAddr().(*net.TCPAddr)
66 destPort := uint(remoteAddr.Port)
67
68 probeHTTP(ctx, h.Logger, "encode-start",
69 zap.String("dstAddr", destConn.RemoteAddr().String()),
70 zap.Int("reqBufLen", len(reqBuf)))
71
72 // Forward initial request to server.
73 reqWriteStart := time.Now()
74 _, err := destConn.Write(reqBuf)
75 probeHTTP(ctx, h.Logger, "encode-req-written",
76 zap.Int64("write_dur_ns", time.Since(reqWriteStart).Nanoseconds()),
77 zap.Error(err))
78 if err != nil {
79 h.Logger.Error("failed to write request message to the destination server", zap.Error(err))
80 return err
81 }
82
83 if ctx.Err() != nil {
84 return ctx.Err()
85 }
86
87 h.Logger.Debug("This is the initial request: " + string(reqBuf))
88 var finalReq []byte
89 errCh := make(chan error, 1)
90
91 finalReq = append(finalReq, reqBuf...)
92
93 // Get the error group from the context.
94 g, ok := ctx.Value(models.ErrGroupKey).(*errgroup.Group)
95 if !ok {
96 return errors.New("failed to get the error group from the context")
97 }
98
99 // Async mock recorder: parseFinalHTTP runs here off the forwarding path.
100 // The channel is buffered so the forwarding goroutine never blocks on send.
101 mockDataCh := make(chan *FinalHTTP, 256)
102 recorderDone := make(chan struct{})
103 recordCtx := context.WithoutCancel(ctx)
104 go func() {
105 defer pUtil.Recover(h.Logger, clientConn, destConn)
106 defer close(recorderDone)
107 for m := range mockDataCh {
108 err := h.parseFinalHTTP(recordCtx, m, destPort, mocks, opts, onMockRecorded)
109 if err != nil {
110 utils.LogError(h.Logger, err, "failed to parse the final http request and response")
111 }
112 }
113 }()
114
115 // enqueueMock sends a paired request/response for async mock creation.
116 // Non-blocking: if the recorder can't keep up, the mock is dropped
117 // (same pattern as postgres/mongo).
118 enqueueMock := func(req, resp []byte, reqTs, resTs time.Time) {
119 m := &FinalHTTP{
120 Req: req,
121 Resp: resp,

Callers 1

recordLegacyMethod · 0.95

Calls 15

parseFinalHTTPMethod · 0.95
HandleChunkedRequestsMethod · 0.95
LogErrorFunction · 0.92
IsRecordingPausedFunction · 0.92
CapturedReqTimeFunction · 0.92
CapturedRespTimeFunction · 0.92
probeHTTPFunction · 0.85
parseRequestMethodAndURLFunction · 0.85
upstreamRequestURLFunction · 0.85
upstreamErrorClassFunction · 0.85

Tested by

no test coverage detected