MCPcopy Index your code
hub / github.com/apache/devlake / createTmpTableInStarrocks

Function createTmpTableInStarrocks

backend/plugins/starrocks/tasks/tasks.go:135–260  ·  view source on GitHub ↗

create temp table for dealing with some complex logic

(dc *DataConfigParams)

Source from the content-addressed store, hash-verified

133
134// create temp table for dealing with some complex logic
135func createTmpTableInStarrocks(dc *DataConfigParams) (map[string]string, string, bool, error) {
136 logger := dc.Ctx.GetLogger()
137 config := dc.Config
138 db := dc.SrcDb
139 starrocksDb := dc.DestDb
140 table := dc.SrcTableName
141 starrocksTable := dc.DestTableName
142 starrocksTmpTable := fmt.Sprintf("%s_tmp", starrocksTable)
143 columnMetas, err := db.GetColumns(&Table{name: table}, nil)
144 updateColumn := config.UpdateColumn
145 columnMap := make(map[string]string)
146 if err != nil {
147 if strings.Contains(err.Error(), "cached plan must not change result type") {
148 logger.Warn(err, "skip err: cached plan must not change result type")
149 columnMetas, err = db.GetColumns(&Table{name: table}, nil)
150 if err != nil {
151 return nil, "", false, err
152 }
153 } else {
154 return nil, "", false, err
155 }
156 }
157
158 var pks, orders, columns []string
159 var separator, firstcm, firstcmName string
160 if db.Dialect() == "postgres" {
161 separator = "\""
162 } else if db.Dialect() == "mysql" {
163 separator = "`"
164 } else {
165 return nil, "", false, errors.NotFound.New(fmt.Sprintf("unsupported dialect %s", db.Dialect()))
166 }
167 tableConfig, ok := config.TableConfigs[table]
168 for _, cm := range columnMetas {
169 name := cm.Name()
170 if ok {
171 if len(tableConfig.ExcludedColumns) > 0 {
172 if slices.Contains(tableConfig.ExcludedColumns, name) {
173 continue
174 }
175 }
176 if len(tableConfig.IncludedColumns) > 0 {
177 if !slices.Contains(tableConfig.IncludedColumns, name) {
178 continue
179 }
180 }
181 }
182 if name == updateColumn {
183 // check update column to detect skip or not
184 var updatedFrom time.Time
185 err = db.All(&updatedFrom, dal.Select(updateColumn), dal.From(table), dal.Limit(1), dal.Orderby(fmt.Sprintf("%s desc", updateColumn)))
186 if err != nil {
187 return nil, "", false, err
188 }
189
190 var updatedTo time.Time
191 err = starrocksDb.All(&updatedTo, dal.Select(updateColumn), dal.From(starrocksTable), dal.Limit(1), dal.Orderby(fmt.Sprintf("%s desc", updateColumn)))
192 if err != nil {

Callers 1

ExportDataFunction · 0.85

Calls 14

ColumnTypeMethod · 0.80
PrimaryKeyMethod · 0.80
GetLoggerMethod · 0.65
GetColumnsMethod · 0.65
ErrorMethod · 0.65
WarnMethod · 0.65
DialectMethod · 0.65
NewMethod · 0.65
NameMethod · 0.65
AllMethod · 0.65
FromMethod · 0.65
CountMethod · 0.65

Tested by

no test coverage detected