nolint:dupl
(ctx context.Context, migrateOptions migrateV3Options)
| 26 | |
| 27 | // nolint:dupl |
| 28 | func migrateConnectionV3(ctx context.Context, migrateOptions migrateV3Options) error { |
| 29 | var ( |
| 30 | sourceClient = migrateOptions.sourceClient |
| 31 | destinationsClients = migrateOptions.destinationsClients |
| 32 | sourceSpec = migrateOptions.sourceSpec |
| 33 | destinationSpecs = migrateOptions.destinationSpecs |
| 34 | transformersForDestination = migrateOptions.transformersForDestination |
| 35 | transformerSpecsByName = migrateOptions.transformerSpecsByName |
| 36 | cqColumnsNotNull = migrateOptions.cqColumnsNotNull |
| 37 | ) |
| 38 | |
| 39 | destinationStrings := make([]string, len(destinationSpecs)) |
| 40 | for i := range destinationSpecs { |
| 41 | destinationStrings[i] = destinationSpecs[i].VersionString() |
| 42 | } |
| 43 | transformerStrings := make([]string, 0, len(transformerSpecsByName)) |
| 44 | for _, transformerSpec := range transformerSpecsByName { |
| 45 | transformerStrings = append(transformerStrings, transformerSpec.VersionString()) |
| 46 | } |
| 47 | |
| 48 | transformerPbClientsByDestination := map[string][]plugin.PluginClient{} |
| 49 | for name, transformers := range transformersForDestination { |
| 50 | for _, tf := range transformers { |
| 51 | transformerPbClientsByDestination[name] = append(transformerPbClientsByDestination[name], plugin.NewPluginClient(tf.Conn)) |
| 52 | } |
| 53 | } |
| 54 | |
| 55 | migrateStart := time.Now().UTC() |
| 56 | log.Info().Str("source", sourceSpec.Name).Strs("destinations", destinationStrings).Strs("transformers", transformerStrings).Time("migrate_time", migrateStart).Msg("Start migration") |
| 57 | defer log.Info().Str("source", sourceSpec.Name).Strs("destinations", destinationStrings).Strs("transformers", transformerStrings).Time("migrate_time", migrateStart).Msg("End migration") |
| 58 | |
| 59 | sourcePbClient := plugin.NewPluginClient(sourceClient.Conn) |
| 60 | destinationsPbClients := make([]plugin.PluginClient, len(destinationsClients)) |
| 61 | destinationRecordTransformers := make([]*transformer.RecordTransformer, len(destinationsClients)) |
| 62 | for i := range destinationsClients { |
| 63 | destinationsPbClients[i] = plugin.NewPluginClient(destinationsClients[i].Conn) |
| 64 | opts := []transformer.RecordTransformerOption{ |
| 65 | transformer.WithSourceNameColumn(sourceSpec.Name), |
| 66 | transformer.WithSyncTimeColumn(migrateStart), |
| 67 | } |
| 68 | if cqColumnsNotNull { |
| 69 | opts = append(opts, transformer.WithCQColumnsNotNull()) |
| 70 | } |
| 71 | if destinationSpecs[i].SyncGroupId != "" { |
| 72 | opts = append(opts, transformer.WithSyncGroupIdColumn(destinationSpecs[i].RenderedSyncGroupId(migrateStart, invocationUUID.String()))) |
| 73 | } |
| 74 | if destinationSpecs[i].WriteMode == specs.WriteModeAppend { |
| 75 | opts = append(opts, transformer.WithRemovePKs(), transformer.WithRemoveUniqueConstraints()) |
| 76 | } else if destinationSpecs[i].PKMode == specs.PKModeCQID { |
| 77 | opts = append(opts, transformer.WithRemovePKs()) |
| 78 | opts = append(opts, transformer.WithCQIDPrimaryKey()) |
| 79 | } |
| 80 | destinationRecordTransformers[i] = transformer.NewRecordTransformer(opts...) |
| 81 | } |
| 82 | |
| 83 | // initialize destinations first, so that their connections may be used as backends by the source |
| 84 | for i, destinationSpec := range destinationSpecs { |
| 85 | if err := initPlugin(ctx, destinationsPbClients[i], destinationSpec.Spec, false, invocationUUID.String()); err != nil { |
no test coverage detected