encodeHTTP records outgoing HTTP traffic. The read/forward/chunked-handling logic is identical to the original synchronous implementation. The only difference is that parseFinalHTTP (HTTP parsing, body decompression, mock creation) is offloaded to a background goroutine so it never blocks the forwar
(ctx context.Context, reqBuf []byte, clientConn, destConn net.Conn, mocks chan<- *models.Mock, opts models.OutgoingOptions, onMockRecorded integrations.PostRecordHook)
| 62 | // creation) is offloaded to a background goroutine so it never blocks the |
| 63 | // forwarding path. |
| 64 | func (h *HTTP) encodeHTTP(ctx context.Context, reqBuf []byte, clientConn, destConn net.Conn, mocks chan<- *models.Mock, opts models.OutgoingOptions, onMockRecorded integrations.PostRecordHook) error { |
| 65 | remoteAddr := destConn.RemoteAddr().(*net.TCPAddr) |
| 66 | destPort := uint(remoteAddr.Port) |
| 67 | |
| 68 | probeHTTP(ctx, h.Logger, "encode-start", |
| 69 | zap.String("dstAddr", destConn.RemoteAddr().String()), |
| 70 | zap.Int("reqBufLen", len(reqBuf))) |
| 71 | |
| 72 | // Forward initial request to server. |
| 73 | reqWriteStart := time.Now() |
| 74 | _, err := destConn.Write(reqBuf) |
| 75 | probeHTTP(ctx, h.Logger, "encode-req-written", |
| 76 | zap.Int64("write_dur_ns", time.Since(reqWriteStart).Nanoseconds()), |
| 77 | zap.Error(err)) |
| 78 | if err != nil { |
| 79 | h.Logger.Error("failed to write request message to the destination server", zap.Error(err)) |
| 80 | return err |
| 81 | } |
| 82 | |
| 83 | if ctx.Err() != nil { |
| 84 | return ctx.Err() |
| 85 | } |
| 86 | |
| 87 | h.Logger.Debug("This is the initial request: " + string(reqBuf)) |
| 88 | var finalReq []byte |
| 89 | errCh := make(chan error, 1) |
| 90 | |
| 91 | finalReq = append(finalReq, reqBuf...) |
| 92 | |
| 93 | // Get the error group from the context. |
| 94 | g, ok := ctx.Value(models.ErrGroupKey).(*errgroup.Group) |
| 95 | if !ok { |
| 96 | return errors.New("failed to get the error group from the context") |
| 97 | } |
| 98 | |
| 99 | // Async mock recorder: parseFinalHTTP runs here off the forwarding path. |
| 100 | // The channel is buffered so the forwarding goroutine never blocks on send. |
| 101 | mockDataCh := make(chan *FinalHTTP, 256) |
| 102 | recorderDone := make(chan struct{}) |
| 103 | recordCtx := context.WithoutCancel(ctx) |
| 104 | go func() { |
| 105 | defer pUtil.Recover(h.Logger, clientConn, destConn) |
| 106 | defer close(recorderDone) |
| 107 | for m := range mockDataCh { |
| 108 | err := h.parseFinalHTTP(recordCtx, m, destPort, mocks, opts, onMockRecorded) |
| 109 | if err != nil { |
| 110 | utils.LogError(h.Logger, err, "failed to parse the final http request and response") |
| 111 | } |
| 112 | } |
| 113 | }() |
| 114 | |
| 115 | // enqueueMock sends a paired request/response for async mock creation. |
| 116 | // Non-blocking: if the recorder can't keep up, the mock is dropped |
| 117 | // (same pattern as postgres/mongo). |
| 118 | enqueueMock := func(req, resp []byte, reqTs, resTs time.Time) { |
| 119 | m := &FinalHTTP{ |
| 120 | Req: req, |
| 121 | Resp: resp, |
no test coverage detected