MCPcopy Index your code
hub / github.com/cloudquery/cloudquery / migrateConnectionV3

Function migrateConnectionV3

cli/cmd/migrate_v3.go:28–178  ·  view source on GitHub ↗

nolint:dupl

(ctx context.Context, migrateOptions migrateV3Options)

Source from the content-addressed store, hash-verified

26
27// nolint:dupl
28func migrateConnectionV3(ctx context.Context, migrateOptions migrateV3Options) error {
29 var (
30 sourceClient = migrateOptions.sourceClient
31 destinationsClients = migrateOptions.destinationsClients
32 sourceSpec = migrateOptions.sourceSpec
33 destinationSpecs = migrateOptions.destinationSpecs
34 transformersForDestination = migrateOptions.transformersForDestination
35 transformerSpecsByName = migrateOptions.transformerSpecsByName
36 cqColumnsNotNull = migrateOptions.cqColumnsNotNull
37 )
38
39 destinationStrings := make([]string, len(destinationSpecs))
40 for i := range destinationSpecs {
41 destinationStrings[i] = destinationSpecs[i].VersionString()
42 }
43 transformerStrings := make([]string, 0, len(transformerSpecsByName))
44 for _, transformerSpec := range transformerSpecsByName {
45 transformerStrings = append(transformerStrings, transformerSpec.VersionString())
46 }
47
48 transformerPbClientsByDestination := map[string][]plugin.PluginClient{}
49 for name, transformers := range transformersForDestination {
50 for _, tf := range transformers {
51 transformerPbClientsByDestination[name] = append(transformerPbClientsByDestination[name], plugin.NewPluginClient(tf.Conn))
52 }
53 }
54
55 migrateStart := time.Now().UTC()
56 log.Info().Str("source", sourceSpec.Name).Strs("destinations", destinationStrings).Strs("transformers", transformerStrings).Time("migrate_time", migrateStart).Msg("Start migration")
57 defer log.Info().Str("source", sourceSpec.Name).Strs("destinations", destinationStrings).Strs("transformers", transformerStrings).Time("migrate_time", migrateStart).Msg("End migration")
58
59 sourcePbClient := plugin.NewPluginClient(sourceClient.Conn)
60 destinationsPbClients := make([]plugin.PluginClient, len(destinationsClients))
61 destinationRecordTransformers := make([]*transformer.RecordTransformer, len(destinationsClients))
62 for i := range destinationsClients {
63 destinationsPbClients[i] = plugin.NewPluginClient(destinationsClients[i].Conn)
64 opts := []transformer.RecordTransformerOption{
65 transformer.WithSourceNameColumn(sourceSpec.Name),
66 transformer.WithSyncTimeColumn(migrateStart),
67 }
68 if cqColumnsNotNull {
69 opts = append(opts, transformer.WithCQColumnsNotNull())
70 }
71 if destinationSpecs[i].SyncGroupId != "" {
72 opts = append(opts, transformer.WithSyncGroupIdColumn(destinationSpecs[i].RenderedSyncGroupId(migrateStart, invocationUUID.String())))
73 }
74 if destinationSpecs[i].WriteMode == specs.WriteModeAppend {
75 opts = append(opts, transformer.WithRemovePKs(), transformer.WithRemoveUniqueConstraints())
76 } else if destinationSpecs[i].PKMode == specs.PKModeCQID {
77 opts = append(opts, transformer.WithRemovePKs())
78 opts = append(opts, transformer.WithCQIDPrimaryKey())
79 }
80 destinationRecordTransformers[i] = transformer.NewRecordTransformer(opts...)
81 }
82
83 // initialize destinations first, so that their connections may be used as backends by the source
84 for i, destinationSpec := range destinationSpecs {
85 if err := initPlugin(ctx, destinationsPbClients[i], destinationSpec.Spec, false, invocationUUID.String()); err != nil {

Callers 1

migrateFunction · 0.85

Calls 15

newSafeWriteClientFunction · 0.85
handleSendErrorFunction · 0.85
migrateSummaryTableFunction · 0.85
VersionStringMethod · 0.80
RenderedSyncGroupIdMethod · 0.80
ErrorfMethod · 0.80
PrintfMethod · 0.80
CloseAndRecvMethod · 0.80
PrintlnMethod · 0.80
initPluginFunction · 0.70
WriteMethod · 0.65
CloseMethod · 0.65

Tested by

no test coverage detected