MCPcopy
hub / github.com/keploy/keploy / asyncMySQLDecode

Function asyncMySQLDecode

pkg/agent/proxy/integrations/mysql/recorder/query.go:371–712  ·  view source on GitHub ↗

asyncMySQLDecode runs in a background goroutine and processes forwarded chunks in FIFO order. It reassembles MySQL packets, decodes them, pairs commands with responses, and records mocks — all off the forwarding path.

(ctx context.Context, logger *zap.Logger, decodeChan <-chan mysqlDecodeItem, mocks chan<- *models.Mock, decodeCtx *wire.DecodeContext, clientConn net.Conn, opts models.OutgoingOptions)

Source from the content-addressed store, hash-verified

369// chunks in FIFO order. It reassembles MySQL packets, decodes them, pairs
370// commands with responses, and records mocks — all off the forwarding path.
371func asyncMySQLDecode(ctx context.Context, logger *zap.Logger, decodeChan <-chan mysqlDecodeItem, mocks chan<- *models.Mock, decodeCtx *wire.DecodeContext, clientConn net.Conn, opts models.OutgoingOptions) {
372 var clientReassembly mysqlReassemblyBuffer
373 var destReassembly mysqlReassemblyBuffer
374 var clientOverflowLogged, destOverflowLogged bool
375
376 state := stateExpectCommand
377
378 // Current command being processed.
379 var (
380 pendingCommand *mysql.PacketBundle
381 reqTimestamp time.Time
382 resTimestamp time.Time
383 pendingRespBundle *mysql.PacketBundle // accumulated response
384 lastOp byte // the MySQL command type
385 remainingCols uint64 // columns left to read
386 remainingParams uint16 // params left to read (stmt prepare)
387 )
388
389 // Temporary storage for result set assembly.
390 var (
391 textResultSet *mysql.TextResultSet
392 binaryResultSet *mysql.BinaryProtocolResultSet
393 stmtPrepareOk *mysql.StmtPrepareOkPacket
394 )
395
396 flushMock := func() {
397 if pendingCommand == nil || pendingRespBundle == nil {
398 return
399 }
400 // If the recorder is force-flushing mid-result-set (i.e., the
401 // next client command arrived before we observed the trailing
402 // EOF / OK_with_EOF terminator from the server), the encoded
403 // mock would otherwise have FinalResponse == nil and the replay
404 // encoder would emit no terminator. Drivers that strictly
405 // require the terminator — most notably Connector/J on Java 8 —
406 // then block in socketRead0 forever waiting for it.
407 //
408 // This race is intrinsic to the byte-relay → async-decode split:
409 // clientBuffChan / destBuffChan feed asyncMySQLDecode through a
410 // single FIFO channel via a non-deterministic select in
411 // handleClientQueries, so on a fast loopback the next client
412 // command can be enqueued before the server's terminator chunk.
413 // Guarding the select doesn't fix it without serializing all
414 // recording, so we synthesize a structurally-correct terminator
415 // here, matching the recorded server's negotiated capabilities,
416 // and stamp it onto the in-memory result set before flushing.
417 closeIncompleteResultSetForFlush(state, decodeCtx,
418 textResultSet, binaryResultSet, pendingRespBundle)
419 requests := []mysql.Request{{PacketBundle: *pendingCommand}}
420 responses := []mysql.Response{{PacketBundle: *pendingRespBundle}}
421 respOp := pendingRespBundle.Header.Type
422 // Lifetime classification at record time: prepared-statement
423 // setup (COM_STMT_PREPARE → StmtPrepareOkPacket) is connection-
424 // scoped. The executes that reference the statement by id on
425 // the same connection may land in a different test's window, so
426 // tagging as per-test ("mocks") would have the strict-window
427 // filter drop the setup and break replay. Tagging as session
428 // ("config") would share it across unrelated connections,

Callers 1

handleClientQueriesFunction · 0.85

Calls 15

appendMethod · 0.95
didOverflowMethod · 0.95
extractCompletePacketMethod · 0.95
DecodePayloadFunction · 0.92
IsNoResponseCommandFunction · 0.92
DecodeColumnFunction · 0.92
StatusToStringFunction · 0.92
DecodeTextRowFunction · 0.92
DecodeBinaryRowFunction · 0.92
recordMockFunction · 0.85
processFirstResponseFunction · 0.85

Tested by

no test coverage detected