(ctx context.Context, overwrite bool, table *schema.Table, msgs message.WriteInserts)
| 58 | } |
| 59 | |
| 60 | func (c *Client) writeResources(ctx context.Context, overwrite bool, table *schema.Table, msgs message.WriteInserts) error { |
| 61 | tablesWithTruncation := make(map[string]bool) |
| 62 | pks := make([]int, 0) |
| 63 | for i, col := range table.Columns { |
| 64 | if !col.PrimaryKey { |
| 65 | continue |
| 66 | } |
| 67 | sqlType := arrowTypeToMySqlStr(col.Type) |
| 68 | if sqlType != "blob" && sqlType != "text" { |
| 69 | continue |
| 70 | } |
| 71 | // only if the PK is a blob or a text do we care about the length of the data |
| 72 | pks = append(pks, i) |
| 73 | } |
| 74 | |
| 75 | rowCount := 0 |
| 76 | queryArgs := []any{} |
| 77 | for _, msg := range msgs { |
| 78 | rec := msg.Record |
| 79 | transformedRecords, err := transformRecord(rec) |
| 80 | if err != nil { |
| 81 | return err |
| 82 | } |
| 83 | |
| 84 | for _, record := range transformedRecords { |
| 85 | for _, truncatablePKIndex := range pks { |
| 86 | // log a warning that a blob or text field that is a PK has more than the limit |
| 87 | lengthPerPk := c.maxIndexLength / len(pks) |
| 88 | if len(record[truncatablePKIndex].(string)) > lengthPerPk { |
| 89 | indexes := table.PrimaryKeysIndexes() |
| 90 | pkValues := make(map[string]any, len(indexes)) |
| 91 | for i, pkIndex := range indexes { |
| 92 | pkValues[table.Columns[pkIndex].Name] = record[i] |
| 93 | } |
| 94 | c.logger.Debug().Any("pk_values", pkValues).Msgf("record contains a primary key that is longer than MySQL can handle. only the first %d will be included in the index", lengthPerPk) |
| 95 | tablesWithTruncation[table.Name] = true |
| 96 | break |
| 97 | } |
| 98 | } |
| 99 | } |
| 100 | |
| 101 | for _, transformedRecord := range transformedRecords { |
| 102 | queryArgs = append(queryArgs, transformedRecord...) |
| 103 | rowCount++ |
| 104 | } |
| 105 | } |
| 106 | |
| 107 | query := getInsertQueryBuild(table, rowCount, overwrite) |
| 108 | _, err := c.db.ExecContext(ctx, query.String(), queryArgs...) |
| 109 | if err != nil { |
| 110 | logTablesWithTruncation(c.logger, tablesWithTruncation) |
| 111 | return err |
| 112 | } |
| 113 | logTablesWithTruncation(c.logger, tablesWithTruncation) |
| 114 | return nil |
| 115 | } |
| 116 | |
| 117 | func (c *Client) appendTableBatch(ctx context.Context, resources message.WriteInserts) error { |
no test coverage detected