( ctx workflow.Context, logger log.Logger, cfg *protos.FlowConnectionConfigsCore, state *cdc_state.CDCFlowWorkflowState, mirrorNameSearch temporal.SearchAttributes, )
| 231 | } |
| 232 | |
| 233 | func processTableAdditions( |
| 234 | ctx workflow.Context, |
| 235 | logger log.Logger, |
| 236 | cfg *protos.FlowConnectionConfigsCore, |
| 237 | state *cdc_state.CDCFlowWorkflowState, |
| 238 | mirrorNameSearch temporal.SearchAttributes, |
| 239 | ) error { |
| 240 | flowConfigUpdate := state.FlowConfigUpdate |
| 241 | if len(flowConfigUpdate.AdditionalTables) == 0 { |
| 242 | syncStateToConfigProtoInCatalog(ctx, cfg, state) |
| 243 | return nil |
| 244 | } |
| 245 | checkDestinationOverlap := !getClickHouseInitialLoadAllowNonEmptyTables(ctx, logger, cfg.Env) |
| 246 | if internal.AdditionalTablesHasOverlap( |
| 247 | state.SyncFlowOptions.TableMappings, flowConfigUpdate.AdditionalTables, checkDestinationOverlap, |
| 248 | ) { |
| 249 | logger.Warn("duplicate source/destination tables found in additionalTables") |
| 250 | syncStateToConfigProtoInCatalog(ctx, cfg, state) |
| 251 | return nil |
| 252 | } |
| 253 | state.UpdateStatus(ctx, logger, protos.FlowStatus_STATUS_SNAPSHOT) |
| 254 | |
| 255 | addTablesSelector := workflow.NewNamedSelector(ctx, "AddTables") |
| 256 | addTablesSelector.AddReceive(ctx.Done(), func(_ workflow.ReceiveChannel, _ bool) {}) |
| 257 | flowSignalStateChangeChan := model.FlowSignalStateChange.GetSignalChannel(ctx) |
| 258 | flowSignalStateChangeChan.AddToSelector(addTablesSelector, handleFlowSignalStateChange(ctx, cfg, state, logger, "AddTables")) |
| 259 | |
| 260 | logger.Info("altering publication for additional tables") |
| 261 | alterPublicationAddAdditionalTablesCtx := workflow.WithActivityOptions(ctx, workflow.ActivityOptions{ |
| 262 | StartToCloseTimeout: 1 * time.Hour, |
| 263 | HeartbeatTimeout: 5 * time.Minute, |
| 264 | }) |
| 265 | alterPublicationAddAdditionalTablesFuture := workflow.ExecuteActivity( |
| 266 | alterPublicationAddAdditionalTablesCtx, |
| 267 | flowable.AddTablesToPublication, |
| 268 | cfg, flowConfigUpdate.AdditionalTables) |
| 269 | |
| 270 | var res *CDCFlowWorkflowResult |
| 271 | var addTablesFlowErr error |
| 272 | addTablesSelector.AddFuture(alterPublicationAddAdditionalTablesFuture, func(f workflow.Future) { |
| 273 | addTablesFlowErr = f.Get(alterPublicationAddAdditionalTablesCtx, f) |
| 274 | if addTablesFlowErr == nil { |
| 275 | logger.Info("additional tables added to publication") |
| 276 | additionalTablesUUID := GetUUID(ctx) |
| 277 | childAdditionalTablesCDCFlowID := GetChildWorkflowID( |
| 278 | additionalTablesCDCFlowPrefix, |
| 279 | cfg.FlowJobName, |
| 280 | additionalTablesUUID, |
| 281 | ) |
| 282 | additionalTablesCfg := proto.CloneOf(cfg) |
| 283 | additionalTablesCfg.DoInitialSnapshot = !flowConfigUpdate.SkipInitialSnapshotForTableAdditions |
| 284 | additionalTablesCfg.InitialSnapshotOnly = true |
| 285 | additionalTablesCfg.TableMappings = flowConfigUpdate.AdditionalTables |
| 286 | additionalTablesCfg.Resync = false |
| 287 | if state.SnapshotNumRowsPerPartition > 0 { |
| 288 | additionalTablesCfg.SnapshotNumRowsPerPartition = state.SnapshotNumRowsPerPartition |
| 289 | } |
| 290 | if state.SnapshotNumPartitionsOverride > 0 { |
no test coverage detected