asyncMySQLDecode runs in a background goroutine and processes forwarded chunks in FIFO order. It reassembles MySQL packets, decodes them, pairs commands with responses, and records mocks — all off the forwarding path.
(ctx context.Context, logger *zap.Logger, decodeChan <-chan mysqlDecodeItem, mocks chan<- *models.Mock, decodeCtx *wire.DecodeContext, clientConn net.Conn, opts models.OutgoingOptions)
| 369 | // chunks in FIFO order. It reassembles MySQL packets, decodes them, pairs |
| 370 | // commands with responses, and records mocks — all off the forwarding path. |
| 371 | func asyncMySQLDecode(ctx context.Context, logger *zap.Logger, decodeChan <-chan mysqlDecodeItem, mocks chan<- *models.Mock, decodeCtx *wire.DecodeContext, clientConn net.Conn, opts models.OutgoingOptions) { |
| 372 | var clientReassembly mysqlReassemblyBuffer |
| 373 | var destReassembly mysqlReassemblyBuffer |
| 374 | var clientOverflowLogged, destOverflowLogged bool |
| 375 | |
| 376 | state := stateExpectCommand |
| 377 | |
| 378 | // Current command being processed. |
| 379 | var ( |
| 380 | pendingCommand *mysql.PacketBundle |
| 381 | reqTimestamp time.Time |
| 382 | resTimestamp time.Time |
| 383 | pendingRespBundle *mysql.PacketBundle // accumulated response |
| 384 | lastOp byte // the MySQL command type |
| 385 | remainingCols uint64 // columns left to read |
| 386 | remainingParams uint16 // params left to read (stmt prepare) |
| 387 | ) |
| 388 | |
| 389 | // Temporary storage for result set assembly. |
| 390 | var ( |
| 391 | textResultSet *mysql.TextResultSet |
| 392 | binaryResultSet *mysql.BinaryProtocolResultSet |
| 393 | stmtPrepareOk *mysql.StmtPrepareOkPacket |
| 394 | ) |
| 395 | |
| 396 | flushMock := func() { |
| 397 | if pendingCommand == nil || pendingRespBundle == nil { |
| 398 | return |
| 399 | } |
| 400 | // If the recorder is force-flushing mid-result-set (i.e., the |
| 401 | // next client command arrived before we observed the trailing |
| 402 | // EOF / OK_with_EOF terminator from the server), the encoded |
| 403 | // mock would otherwise have FinalResponse == nil and the replay |
| 404 | // encoder would emit no terminator. Drivers that strictly |
| 405 | // require the terminator — most notably Connector/J on Java 8 — |
| 406 | // then block in socketRead0 forever waiting for it. |
| 407 | // |
| 408 | // This race is intrinsic to the byte-relay → async-decode split: |
| 409 | // clientBuffChan / destBuffChan feed asyncMySQLDecode through a |
| 410 | // single FIFO channel via a non-deterministic select in |
| 411 | // handleClientQueries, so on a fast loopback the next client |
| 412 | // command can be enqueued before the server's terminator chunk. |
| 413 | // Guarding the select doesn't fix it without serializing all |
| 414 | // recording, so we synthesize a structurally-correct terminator |
| 415 | // here, matching the recorded server's negotiated capabilities, |
| 416 | // and stamp it onto the in-memory result set before flushing. |
| 417 | closeIncompleteResultSetForFlush(state, decodeCtx, |
| 418 | textResultSet, binaryResultSet, pendingRespBundle) |
| 419 | requests := []mysql.Request{{PacketBundle: *pendingCommand}} |
| 420 | responses := []mysql.Response{{PacketBundle: *pendingRespBundle}} |
| 421 | respOp := pendingRespBundle.Header.Type |
| 422 | // Lifetime classification at record time: prepared-statement |
| 423 | // setup (COM_STMT_PREPARE → StmtPrepareOkPacket) is connection- |
| 424 | // scoped. The executes that reference the statement by id on |
| 425 | // the same connection may land in a different test's window, so |
| 426 | // tagging as per-test ("mocks") would have the strict-window |
| 427 | // filter drop the setup and break replay. Tagging as session |
| 428 | // ("config") would share it across unrelated connections, |
no test coverage detected