MCPcopy Index your code
hub / github.com/cloudquery/cloudquery / syncConnectionV3

Function syncConnectionV3

cli/cmd/sync_v3.go:138–733  ·  view source on GitHub ↗
(ctx context.Context, syncOptions syncV3Options)

Source from the content-addressed store, hash-verified

136}
137
138func syncConnectionV3(ctx context.Context, syncOptions syncV3Options) (syncErr error) {
139 var (
140 source = syncOptions.source
141 destinations = syncOptions.destinations
142 transformersByDestination = syncOptions.transformersByDestination
143 backend = syncOptions.backend
144 uid = syncOptions.uid
145 noMigrate = syncOptions.noMigrate
146 summaryLocation = syncOptions.summaryLocation
147 shard = syncOptions.shard
148 cqColumnsNotNull = syncOptions.cqColumnsNotNull
149 )
150
151 var mt metrics.Metrics
152 var exitReason = ExitReasonStopped
153 skippedFromDeleteStale := make(map[string]bool)
154 tablesForDeleteStale := make(map[string]bool)
155
156 sourceSpec := source.spec
157 sourceClient := source.client
158 destinationSpecs := make([]specs.Destination, len(destinations))
159 destinationsClients := make([]*managedplugin.Client, len(destinations))
160 for i := range destinations {
161 destinationSpecs[i] = destinations[i].spec
162 destinationsClients[i] = destinations[i].client
163 }
164
165 syncStartedEvent := analytics.SyncStartedEvent{
166 Source: sourceSpec,
167 Destinations: destinationSpecs,
168 }
169 if shard != nil {
170 syncStartedEvent.ShardNum = int(shard.num)
171 syncStartedEvent.ShardTotal = int(shard.total)
172 }
173 analytics.TrackSyncStarted(ctx, invocationUUID.UUID, syncStartedEvent)
174 var (
175 syncTimeTook time.Duration
176 totalResources = int64(0)
177 totals = sourceClient.Metrics()
178 statsPerTable = utils.NewConcurrentMap[string, SyncRunTableProgressValue]()
179 )
180 defer func() {
181 analytics.TrackSyncCompleted(ctx, invocationUUID.UUID, analytics.SyncFinishedEvent{
182 SyncStartedEvent: syncStartedEvent,
183 Errors: totals.Errors,
184 Warnings: totals.Warnings,
185 Duration: syncTimeTook,
186 ResourceCount: totalResources,
187 AbortedDueToError: syncErr,
188 })
189 }()
190
191 defer func() {
192 if oldAnalyticsClient != nil {
193 log.Info().Msg("Sending sync summary to " + oldAnalyticsClient.Host())
194 if err := oldAnalyticsClient.SendSyncMetrics(context.Background(), sourceSpec, destinationSpecs, uid, &mt, exitReason); err != nil {
195 log.Warn().Err(err).Msg("Failed to send sync summary")

Callers 1

syncFunction · 0.85

Calls 15

newSafeWriteClientFunction · 0.85
progressBarInterface · 0.85
handleSendErrorFunction · 0.85
createTableNameSchemaFunction · 0.85
deleteStaleFunction · 0.85
persistSummaryFunction · 0.85
sendSummaryFunction · 0.85
hintSelectMessageFunction · 0.85
HostMethod · 0.80
SendSyncMetricsMethod · 0.80
VersionStringMethod · 0.80

Tested by

no test coverage detected