MCPcopy Index your code
hub / github.com/cloudquery/cloudquery / syncConnectionV2

Function syncConnectionV2

cli/cmd/sync_v2.go:82–270  ·  view source on GitHub ↗

nolint:dupl

(ctx context.Context, sourceClient *managedplugin.Client, destinationsClients managedplugin.Clients, sourceSpec specs.Source, destinationSpecs []specs.Destination, uid string, noMigrate bool, destinationsVersions [][]int)

Source from the content-addressed store, hash-verified

80
81// nolint:dupl
82func syncConnectionV2(ctx context.Context, sourceClient *managedplugin.Client, destinationsClients managedplugin.Clients, sourceSpec specs.Source, destinationSpecs []specs.Destination, uid string, noMigrate bool, destinationsVersions [][]int) error {
83 var mt metrics.Metrics
84 var exitReason = ExitReasonStopped
85 defer func() {
86 if oldAnalyticsClient != nil {
87 log.Info().Msg("Sending sync summary to " + oldAnalyticsClient.Host())
88 if err := oldAnalyticsClient.SendSyncMetrics(context.Background(), sourceSpec, destinationSpecs, uid, &mt, exitReason); err != nil {
89 log.Warn().Err(err).Msg("Failed to send sync summary")
90 }
91 }
92 }()
93 // https://github.com/golang/go/issues/41087
94 syncTime := time.Now().UTC().Truncate(time.Microsecond)
95 destinationStrings := make([]string, len(destinationsClients))
96 for i := range destinationsClients {
97 destinationStrings[i] = destinationSpecs[i].VersionString()
98 }
99 log.Info().Str("source", sourceSpec.VersionString()).Strs("destinations", destinationStrings).Time("sync_time", syncTime).Msg("Start sync")
100 defer log.Info().Str("source", sourceSpec.VersionString()).Strs("destinations", destinationStrings).Time("sync_time", syncTime).Msg("End sync")
101
102 sourcePbClient := source.NewSourceClient(sourceClient.Conn)
103 destinationsPbClients := make([]destination.DestinationClient, len(destinationsClients))
104 destinationsTransformers := getSourceV2DestV3DestinationsTransformers(destinationSpecs, destinationsVersions)
105 for i := range destinationsClients {
106 destinationsPbClients[i] = destination.NewDestinationClient(destinationsClients[i].Conn)
107 }
108
109 specBytes, err := json.Marshal(CLISourceSpecToPbSpec(sourceSpec))
110 if err != nil {
111 return err
112 }
113 if _, err := sourcePbClient.Init(ctx, &source.Init_Request{
114 Spec: specBytes,
115 }); err != nil {
116 return err
117 }
118 tablesRes, err := sourcePbClient.GetDynamicTables(ctx, &source.GetDynamicTables_Request{})
119 if err != nil {
120 return err
121 }
122 for i := range destinationsClients {
123 destSpecBytes, err := json.Marshal(CLIDestinationSpecToPbSpec(destinationSpecs[i]))
124 if err != nil {
125 return err
126 }
127 if _, err := destinationsPbClients[i].Configure(ctx, &destination.Configure_Request{
128 Config: destSpecBytes,
129 }); err != nil {
130 return err
131 }
132 }
133
134 transformedSchemasBytes := make([][][]byte, 0, len(destinationsPbClients))
135 for i := range destinationsPbClients {
136 destinationSchemasBytes, err := transformSourceV2DestV3Schemas(tablesRes.Tables, destinationsTransformers[i])
137 if err != nil {
138 return err
139 }

Callers 1

syncFunction · 0.85

Calls 15

CLISourceSpecToPbSpecFunction · 0.85
progressBarInterface · 0.85
HostMethod · 0.80
SendSyncMetricsMethod · 0.80
VersionStringMethod · 0.80
PrintfMethod · 0.80
PrintMethod · 0.80
CloseAndRecvMethod · 0.80

Tested by

no test coverage detected