MCPcopy
hub / github.com/PeerDB-io/peerdb / processTableAdditions

Function processTableAdditions

flow/workflows/cdc_flow.go:233–349  ·  view source on GitHub ↗
(
	ctx workflow.Context,
	logger log.Logger,
	cfg *protos.FlowConnectionConfigsCore,
	state *cdc_state.CDCFlowWorkflowState,
	mirrorNameSearch temporal.SearchAttributes,
)

Source from the content-addressed store, hash-verified

231}
232
233func processTableAdditions(
234 ctx workflow.Context,
235 logger log.Logger,
236 cfg *protos.FlowConnectionConfigsCore,
237 state *cdc_state.CDCFlowWorkflowState,
238 mirrorNameSearch temporal.SearchAttributes,
239) error {
240 flowConfigUpdate := state.FlowConfigUpdate
241 if len(flowConfigUpdate.AdditionalTables) == 0 {
242 syncStateToConfigProtoInCatalog(ctx, cfg, state)
243 return nil
244 }
245 checkDestinationOverlap := !getClickHouseInitialLoadAllowNonEmptyTables(ctx, logger, cfg.Env)
246 if internal.AdditionalTablesHasOverlap(
247 state.SyncFlowOptions.TableMappings, flowConfigUpdate.AdditionalTables, checkDestinationOverlap,
248 ) {
249 logger.Warn("duplicate source/destination tables found in additionalTables")
250 syncStateToConfigProtoInCatalog(ctx, cfg, state)
251 return nil
252 }
253 state.UpdateStatus(ctx, logger, protos.FlowStatus_STATUS_SNAPSHOT)
254
255 addTablesSelector := workflow.NewNamedSelector(ctx, "AddTables")
256 addTablesSelector.AddReceive(ctx.Done(), func(_ workflow.ReceiveChannel, _ bool) {})
257 flowSignalStateChangeChan := model.FlowSignalStateChange.GetSignalChannel(ctx)
258 flowSignalStateChangeChan.AddToSelector(addTablesSelector, handleFlowSignalStateChange(ctx, cfg, state, logger, "AddTables"))
259
260 logger.Info("altering publication for additional tables")
261 alterPublicationAddAdditionalTablesCtx := workflow.WithActivityOptions(ctx, workflow.ActivityOptions{
262 StartToCloseTimeout: 1 * time.Hour,
263 HeartbeatTimeout: 5 * time.Minute,
264 })
265 alterPublicationAddAdditionalTablesFuture := workflow.ExecuteActivity(
266 alterPublicationAddAdditionalTablesCtx,
267 flowable.AddTablesToPublication,
268 cfg, flowConfigUpdate.AdditionalTables)
269
270 var res *CDCFlowWorkflowResult
271 var addTablesFlowErr error
272 addTablesSelector.AddFuture(alterPublicationAddAdditionalTablesFuture, func(f workflow.Future) {
273 addTablesFlowErr = f.Get(alterPublicationAddAdditionalTablesCtx, f)
274 if addTablesFlowErr == nil {
275 logger.Info("additional tables added to publication")
276 additionalTablesUUID := GetUUID(ctx)
277 childAdditionalTablesCDCFlowID := GetChildWorkflowID(
278 additionalTablesCDCFlowPrefix,
279 cfg.FlowJobName,
280 additionalTablesUUID,
281 )
282 additionalTablesCfg := proto.CloneOf(cfg)
283 additionalTablesCfg.DoInitialSnapshot = !flowConfigUpdate.SkipInitialSnapshotForTableAdditions
284 additionalTablesCfg.InitialSnapshotOnly = true
285 additionalTablesCfg.TableMappings = flowConfigUpdate.AdditionalTables
286 additionalTablesCfg.Resync = false
287 if state.SnapshotNumRowsPerPartition > 0 {
288 additionalTablesCfg.SnapshotNumRowsPerPartition = state.SnapshotNumRowsPerPartition
289 }
290 if state.SnapshotNumPartitionsOverride > 0 {

Callers 1

Calls 14

GetUUIDFunction · 0.85
GetChildWorkflowIDFunction · 0.85
WarnMethod · 0.80
UpdateStatusMethod · 0.80
GetSignalChannelMethod · 0.80
AddToSelectorMethod · 0.80
InfoMethod · 0.80
ExecuteActivityMethod · 0.80
ErrMethod · 0.65

Tested by

no test coverage detected