| 251 | } |
| 252 | |
| 253 | func storeColumnInfoForTable(ctx context.Context, logger log.Logger, conn clickhouse.Conn, |
| 254 | queryTables []string, |
| 255 | tableColumnsMapping map[string][]ClickHouseColumn, |
| 256 | ) error { |
| 257 | rows, err := Query(ctx, logger, conn, |
| 258 | fmt.Sprintf("SELECT name,type,default_kind,table FROM system.columns WHERE database=currentDatabase() AND table IN (%s)", |
| 259 | strings.Join(queryTables, ","))) |
| 260 | if err != nil { |
| 261 | return fmt.Errorf("failed to get columns for destination tables: %w", err) |
| 262 | } |
| 263 | defer rows.Close() |
| 264 | for rows.Next() { |
| 265 | var tableName string |
| 266 | var clickhouseColumn ClickHouseColumn |
| 267 | if err := rows.Scan(&clickhouseColumn.Name, &clickhouseColumn.Type, &clickhouseColumn.DefaultKind, &tableName); err != nil { |
| 268 | return fmt.Errorf("failed to scan columns for tables: %w", err) |
| 269 | } |
| 270 | tableColumnsMapping[tableName] = append(tableColumnsMapping[tableName], clickhouseColumn) |
| 271 | } |
| 272 | if err := rows.Err(); err != nil { |
| 273 | return fmt.Errorf("failed to read rows: %w", err) |
| 274 | } |
| 275 | |
| 276 | return nil |
| 277 | } |
| 278 | |
| 279 | func ValidateOrderingKeys(ctx context.Context, logger log.Logger, conn clickhouse.Conn, |
| 280 | chVersion *chvproto.Version, sourceTable string, |