MCPcopy Index your code
hub / github.com/cloudquery/cloudquery / writeResources

Method writeResources

plugins/destination/mysql/client/write.go:60–115  ·  view source on GitHub ↗
(ctx context.Context, overwrite bool, table *schema.Table, msgs message.WriteInserts)

Source from the content-addressed store, hash-verified

58}
59
60func (c *Client) writeResources(ctx context.Context, overwrite bool, table *schema.Table, msgs message.WriteInserts) error {
61 tablesWithTruncation := make(map[string]bool)
62 pks := make([]int, 0)
63 for i, col := range table.Columns {
64 if !col.PrimaryKey {
65 continue
66 }
67 sqlType := arrowTypeToMySqlStr(col.Type)
68 if sqlType != "blob" && sqlType != "text" {
69 continue
70 }
71 // only if the PK is a blob or a text do we care about the length of the data
72 pks = append(pks, i)
73 }
74
75 rowCount := 0
76 queryArgs := []any{}
77 for _, msg := range msgs {
78 rec := msg.Record
79 transformedRecords, err := transformRecord(rec)
80 if err != nil {
81 return err
82 }
83
84 for _, record := range transformedRecords {
85 for _, truncatablePKIndex := range pks {
86 // log a warning that a blob or text field that is a PK has more than the limit
87 lengthPerPk := c.maxIndexLength / len(pks)
88 if len(record[truncatablePKIndex].(string)) > lengthPerPk {
89 indexes := table.PrimaryKeysIndexes()
90 pkValues := make(map[string]any, len(indexes))
91 for i, pkIndex := range indexes {
92 pkValues[table.Columns[pkIndex].Name] = record[i]
93 }
94 c.logger.Debug().Any("pk_values", pkValues).Msgf("record contains a primary key that is longer than MySQL can handle. only the first %d will be included in the index", lengthPerPk)
95 tablesWithTruncation[table.Name] = true
96 break
97 }
98 }
99 }
100
101 for _, transformedRecord := range transformedRecords {
102 queryArgs = append(queryArgs, transformedRecord...)
103 rowCount++
104 }
105 }
106
107 query := getInsertQueryBuild(table, rowCount, overwrite)
108 _, err := c.db.ExecContext(ctx, query.String(), queryArgs...)
109 if err != nil {
110 logTablesWithTruncation(c.logger, tablesWithTruncation)
111 return err
112 }
113 logTablesWithTruncation(c.logger, tablesWithTruncation)
114 return nil
115}
116
117func (c *Client) appendTableBatch(ctx context.Context, resources message.WriteInserts) error {

Callers 2

appendTableBatchMethod · 0.95
overwriteTableBatchMethod · 0.95

Calls 5

arrowTypeToMySqlStrFunction · 0.85
getInsertQueryBuildFunction · 0.85
logTablesWithTruncationFunction · 0.85
transformRecordFunction · 0.70
StringMethod · 0.45

Tested by

no test coverage detected