processFirstResponse handles the first response packet of a MySQL command-response exchange. It returns the new decoder state.
( ctx context.Context, logger *zap.Logger, pkt []byte, decodeCtx *wire.DecodeContext, clientConn net.Conn, lastOp byte, pendingRespBundle **mysql.PacketBundle, textResultSet **mysql.TextResultSet, binaryResultSet **mysql.BinaryProtocolResultSet, stmtPrepareOk **mysql.StmtPrepareOkPacket, remainingCols *uint64, remainingParams *uint16, )
| 714 | // processFirstResponse handles the first response packet of a MySQL |
| 715 | // command-response exchange. It returns the new decoder state. |
| 716 | func processFirstResponse( |
| 717 | ctx context.Context, |
| 718 | logger *zap.Logger, |
| 719 | pkt []byte, |
| 720 | decodeCtx *wire.DecodeContext, |
| 721 | clientConn net.Conn, |
| 722 | lastOp byte, |
| 723 | pendingRespBundle **mysql.PacketBundle, |
| 724 | textResultSet **mysql.TextResultSet, |
| 725 | binaryResultSet **mysql.BinaryProtocolResultSet, |
| 726 | stmtPrepareOk **mysql.StmtPrepareOkPacket, |
| 727 | remainingCols *uint64, |
| 728 | remainingParams *uint16, |
| 729 | ) mysqlDecodeState { |
| 730 | // Try to decode the response packet. |
| 731 | commandRespPkt, err := wire.DecodePayload(ctx, logger, pkt, clientConn, decodeCtx) |
| 732 | if err != nil { |
| 733 | logger.Debug("failed to decode MySQL response in async decoder", zap.Error(err)) |
| 734 | return stateExpectCommand |
| 735 | } |
| 736 | |
| 737 | // Check if response is OK or ERR — simple single-packet response. |
| 738 | if commandRespPkt.Header.Type == mysql.StatusToString(mysql.ERR) || |
| 739 | commandRespPkt.Header.Type == mysql.StatusToString(mysql.OK) { |
| 740 | *pendingRespBundle = commandRespPkt |
| 741 | return stateExpectCommand |
| 742 | } |
| 743 | |
| 744 | // Guard: if response was decoded as a command packet, streams are desynced. |
| 745 | if isCommandPacket(commandRespPkt.Message) { |
| 746 | logger.Debug("Response decoded as command packet — stream desync detected", |
| 747 | zap.String("responseType", commandRespPkt.Header.Type)) |
| 748 | decodeCtx.LastOp.Store(clientConn, wire.RESET) |
| 749 | *pendingRespBundle = commandRespPkt |
| 750 | return stateExpectCommand |
| 751 | } |
| 752 | |
| 753 | // Multi-packet response — determine type based on lastOp. |
| 754 | switch lastOp { |
| 755 | case mysql.COM_QUERY: |
| 756 | ts, ok := commandRespPkt.Message.(*mysql.TextResultSet) |
| 757 | if !ok { |
| 758 | logger.Debug("expected TextResultSet", |
| 759 | zap.String("got", fmt.Sprintf("%T", commandRespPkt.Message))) |
| 760 | *pendingRespBundle = commandRespPkt |
| 761 | return stateExpectCommand |
| 762 | } |
| 763 | *textResultSet = ts |
| 764 | *binaryResultSet = nil |
| 765 | *stmtPrepareOk = nil |
| 766 | *remainingCols = ts.ColumnCount |
| 767 | *pendingRespBundle = commandRespPkt |
| 768 | return stateExpectColumns |
| 769 | |
| 770 | case mysql.COM_STMT_PREPARE: |
| 771 | sp, ok := commandRespPkt.Message.(*mysql.StmtPrepareOkPacket) |
| 772 | if !ok { |
| 773 | logger.Debug("expected StmtPrepareOkPacket", |
no test coverage detected