(ctx context.Context, syncOptions syncV3Options)
| 136 | } |
| 137 | |
| 138 | func syncConnectionV3(ctx context.Context, syncOptions syncV3Options) (syncErr error) { |
| 139 | var ( |
| 140 | source = syncOptions.source |
| 141 | destinations = syncOptions.destinations |
| 142 | transformersByDestination = syncOptions.transformersByDestination |
| 143 | backend = syncOptions.backend |
| 144 | uid = syncOptions.uid |
| 145 | noMigrate = syncOptions.noMigrate |
| 146 | summaryLocation = syncOptions.summaryLocation |
| 147 | shard = syncOptions.shard |
| 148 | cqColumnsNotNull = syncOptions.cqColumnsNotNull |
| 149 | ) |
| 150 | |
| 151 | var mt metrics.Metrics |
| 152 | var exitReason = ExitReasonStopped |
| 153 | skippedFromDeleteStale := make(map[string]bool) |
| 154 | tablesForDeleteStale := make(map[string]bool) |
| 155 | |
| 156 | sourceSpec := source.spec |
| 157 | sourceClient := source.client |
| 158 | destinationSpecs := make([]specs.Destination, len(destinations)) |
| 159 | destinationsClients := make([]*managedplugin.Client, len(destinations)) |
| 160 | for i := range destinations { |
| 161 | destinationSpecs[i] = destinations[i].spec |
| 162 | destinationsClients[i] = destinations[i].client |
| 163 | } |
| 164 | |
| 165 | syncStartedEvent := analytics.SyncStartedEvent{ |
| 166 | Source: sourceSpec, |
| 167 | Destinations: destinationSpecs, |
| 168 | } |
| 169 | if shard != nil { |
| 170 | syncStartedEvent.ShardNum = int(shard.num) |
| 171 | syncStartedEvent.ShardTotal = int(shard.total) |
| 172 | } |
| 173 | analytics.TrackSyncStarted(ctx, invocationUUID.UUID, syncStartedEvent) |
| 174 | var ( |
| 175 | syncTimeTook time.Duration |
| 176 | totalResources = int64(0) |
| 177 | totals = sourceClient.Metrics() |
| 178 | statsPerTable = utils.NewConcurrentMap[string, SyncRunTableProgressValue]() |
| 179 | ) |
| 180 | defer func() { |
| 181 | analytics.TrackSyncCompleted(ctx, invocationUUID.UUID, analytics.SyncFinishedEvent{ |
| 182 | SyncStartedEvent: syncStartedEvent, |
| 183 | Errors: totals.Errors, |
| 184 | Warnings: totals.Warnings, |
| 185 | Duration: syncTimeTook, |
| 186 | ResourceCount: totalResources, |
| 187 | AbortedDueToError: syncErr, |
| 188 | }) |
| 189 | }() |
| 190 | |
| 191 | defer func() { |
| 192 | if oldAnalyticsClient != nil { |
| 193 | log.Info().Msg("Sending sync summary to " + oldAnalyticsClient.Host()) |
| 194 | if err := oldAnalyticsClient.SendSyncMetrics(context.Background(), sourceSpec, destinationSpecs, uid, &mt, exitReason); err != nil { |
| 195 | log.Warn().Err(err).Msg("Failed to send sync summary") |
no test coverage detected