resetDBWithSessionParams will return a new sql.DB as a replacement for input `db` with new session parameters. If returned error is nil, the input `db` will be closed.
(tctx *tcontext.Context, db *sql.DB, cfg *mysql.Config, params map[string]any)
| 959 | // resetDBWithSessionParams will return a new sql.DB as a replacement for input `db` with new session parameters. |
| 960 | // If returned error is nil, the input `db` will be closed. |
| 961 | func resetDBWithSessionParams(tctx *tcontext.Context, db *sql.DB, cfg *mysql.Config, params map[string]any) (*sql.DB, error) { |
| 962 | support := make(map[string]any) |
| 963 | for k, v := range params { |
| 964 | var pv any |
| 965 | if str, ok := v.(string); ok { |
| 966 | if pvi, err := strconv.ParseInt(str, 10, 64); err == nil { |
| 967 | pv = pvi |
| 968 | } else if pvf, err := strconv.ParseFloat(str, 64); err == nil { |
| 969 | pv = pvf |
| 970 | } else { |
| 971 | pv = str |
| 972 | } |
| 973 | } else { |
| 974 | pv = v |
| 975 | } |
| 976 | s := fmt.Sprintf("SET SESSION %s = ?", k) |
| 977 | _, err := db.ExecContext(tctx, s, pv) |
| 978 | if err != nil { |
| 979 | if k == snapshotVar { |
| 980 | err = errors.Annotate(err, "fail to set snapshot for tidb, please set --consistency=none/--consistency=lock or fix snapshot problem") |
| 981 | } else if isUnknownSystemVariableErr(err) { |
| 982 | tctx.L().Info("session variable is not supported by db", zap.String("variable", k), zap.Reflect("value", v)) |
| 983 | continue |
| 984 | } |
| 985 | return nil, errors.Trace(err) |
| 986 | } |
| 987 | |
| 988 | support[k] = pv |
| 989 | } |
| 990 | |
| 991 | if cfg.Params == nil { |
| 992 | cfg.Params = make(map[string]string) |
| 993 | } |
| 994 | |
| 995 | for k, v := range support { |
| 996 | var s string |
| 997 | // Wrap string with quote to handle string with space. For example, '2020-10-20 13:41:40' |
| 998 | // For --params argument, quote doesn't matter because it doesn't affect the actual value |
| 999 | if str, ok := v.(string); ok { |
| 1000 | s = wrapStringWith(str, "'") |
| 1001 | } else { |
| 1002 | s = fmt.Sprintf("%v", v) |
| 1003 | } |
| 1004 | cfg.Params[k] = s |
| 1005 | } |
| 1006 | failpoint.Inject("SkipResetDB", func(_ failpoint.Value) { |
| 1007 | failpoint.Return(db, nil) |
| 1008 | }) |
| 1009 | |
| 1010 | db.Close() |
| 1011 | c, err := mysql.NewConnector(cfg) |
| 1012 | if err != nil { |
| 1013 | return nil, errors.Trace(err) |
| 1014 | } |
| 1015 | newDB := sql.OpenDB(c) |
| 1016 | // ping to make sure all session parameters are set correctly |
| 1017 | err = newDB.PingContext(tctx) |
| 1018 | if err != nil { |
no test coverage detected