create temp table for dealing with some complex logic
(dc *DataConfigParams)
| 133 | |
| 134 | // create temp table for dealing with some complex logic |
| 135 | func createTmpTableInStarrocks(dc *DataConfigParams) (map[string]string, string, bool, error) { |
| 136 | logger := dc.Ctx.GetLogger() |
| 137 | config := dc.Config |
| 138 | db := dc.SrcDb |
| 139 | starrocksDb := dc.DestDb |
| 140 | table := dc.SrcTableName |
| 141 | starrocksTable := dc.DestTableName |
| 142 | starrocksTmpTable := fmt.Sprintf("%s_tmp", starrocksTable) |
| 143 | columnMetas, err := db.GetColumns(&Table{name: table}, nil) |
| 144 | updateColumn := config.UpdateColumn |
| 145 | columnMap := make(map[string]string) |
| 146 | if err != nil { |
| 147 | if strings.Contains(err.Error(), "cached plan must not change result type") { |
| 148 | logger.Warn(err, "skip err: cached plan must not change result type") |
| 149 | columnMetas, err = db.GetColumns(&Table{name: table}, nil) |
| 150 | if err != nil { |
| 151 | return nil, "", false, err |
| 152 | } |
| 153 | } else { |
| 154 | return nil, "", false, err |
| 155 | } |
| 156 | } |
| 157 | |
| 158 | var pks, orders, columns []string |
| 159 | var separator, firstcm, firstcmName string |
| 160 | if db.Dialect() == "postgres" { |
| 161 | separator = "\"" |
| 162 | } else if db.Dialect() == "mysql" { |
| 163 | separator = "`" |
| 164 | } else { |
| 165 | return nil, "", false, errors.NotFound.New(fmt.Sprintf("unsupported dialect %s", db.Dialect())) |
| 166 | } |
| 167 | tableConfig, ok := config.TableConfigs[table] |
| 168 | for _, cm := range columnMetas { |
| 169 | name := cm.Name() |
| 170 | if ok { |
| 171 | if len(tableConfig.ExcludedColumns) > 0 { |
| 172 | if slices.Contains(tableConfig.ExcludedColumns, name) { |
| 173 | continue |
| 174 | } |
| 175 | } |
| 176 | if len(tableConfig.IncludedColumns) > 0 { |
| 177 | if !slices.Contains(tableConfig.IncludedColumns, name) { |
| 178 | continue |
| 179 | } |
| 180 | } |
| 181 | } |
| 182 | if name == updateColumn { |
| 183 | // check update column to detect skip or not |
| 184 | var updatedFrom time.Time |
| 185 | err = db.All(&updatedFrom, dal.Select(updateColumn), dal.From(table), dal.Limit(1), dal.Orderby(fmt.Sprintf("%s desc", updateColumn))) |
| 186 | if err != nil { |
| 187 | return nil, "", false, err |
| 188 | } |
| 189 | |
| 190 | var updatedTo time.Time |
| 191 | err = starrocksDb.All(&updatedTo, dal.Select(updateColumn), dal.From(starrocksTable), dal.Limit(1), dal.Orderby(fmt.Sprintf("%s desc", updateColumn))) |
| 192 | if err != nil { |
no test coverage detected