MCPcopy
hub / github.com/PeerDB-io/peerdb / processCDCFlowConfigUpdate

Function processCDCFlowConfigUpdate

flow/workflows/cdc_flow.go:94–169  ·  view source on GitHub ↗
(
	ctx workflow.Context,
	logger log.Logger,
	cfg *protos.FlowConnectionConfigsCore,
	state *cdc_state.CDCFlowWorkflowState,
	mirrorNameSearch temporal.SearchAttributes,
)

Source from the content-addressed store, hash-verified

92}
93
94func processCDCFlowConfigUpdate(
95 ctx workflow.Context,
96 logger log.Logger,
97 cfg *protos.FlowConnectionConfigsCore,
98 state *cdc_state.CDCFlowWorkflowState,
99 mirrorNameSearch temporal.SearchAttributes,
100) error {
101 flowConfigUpdate := state.FlowConfigUpdate
102
103 // Capture old values for logging before updates are applied
104 oldValues := telemetry.OldCDCFlowValues{
105 BatchSize: state.SyncFlowOptions.BatchSize,
106 IdleTimeout: state.SyncFlowOptions.IdleTimeoutSeconds,
107 SnapshotNumRowsPerPartition: state.SnapshotNumRowsPerPartition,
108 SnapshotNumPartitionsOverride: state.SnapshotNumPartitionsOverride,
109 SnapshotMaxParallelWorkers: state.SnapshotMaxParallelWorkers,
110 SnapshotNumTablesInParallel: state.SnapshotNumTablesInParallel,
111 }
112 if len(flowConfigUpdate.UpdatedEnv) > 0 {
113 oldValues.Env = make(map[string]string, len(flowConfigUpdate.UpdatedEnv))
114 if cfg.Env != nil {
115 for key := range flowConfigUpdate.UpdatedEnv {
116 oldValues.Env[key] = cfg.Env[key]
117 }
118 }
119 }
120
121 if flowConfigUpdate.BatchSize > 0 {
122 state.SyncFlowOptions.BatchSize = flowConfigUpdate.BatchSize
123 }
124 if flowConfigUpdate.IdleTimeout > 0 {
125 state.SyncFlowOptions.IdleTimeoutSeconds = flowConfigUpdate.IdleTimeout
126 }
127 if flowConfigUpdate.UpdatedEnv != nil {
128 if cfg.Env == nil {
129 cfg.Env = make(map[string]string, len(flowConfigUpdate.UpdatedEnv))
130 }
131 maps.Copy(cfg.Env, flowConfigUpdate.UpdatedEnv)
132 }
133 if flowConfigUpdate.SnapshotNumRowsPerPartition > 0 {
134 state.SnapshotNumRowsPerPartition = flowConfigUpdate.SnapshotNumRowsPerPartition
135 }
136 if flowConfigUpdate.SnapshotNumPartitionsOverride > 0 {
137 state.SnapshotNumPartitionsOverride = flowConfigUpdate.SnapshotNumPartitionsOverride
138 }
139 if flowConfigUpdate.SnapshotMaxParallelWorkers > 0 {
140 state.SnapshotMaxParallelWorkers = flowConfigUpdate.SnapshotMaxParallelWorkers
141 }
142 if flowConfigUpdate.SnapshotNumTablesInParallel > 0 {
143 state.SnapshotNumTablesInParallel = flowConfigUpdate.SnapshotNumTablesInParallel
144 }
145
146 tablesAreAdded := len(flowConfigUpdate.AdditionalTables) > 0
147 tablesAreRemoved := len(flowConfigUpdate.RemovedTables) > 0
148 if tablesAreAdded || tablesAreRemoved {
149 logger.Info("processing CDCFlowConfigUpdate", slog.Any("updatedState", flowConfigUpdate))
150
151 if tablesAreAdded {

Callers 1

CDCFlowWorkflowFunction · 0.85

Calls 5

processTableAdditionsFunction · 0.85
processTableRemovalsFunction · 0.85
InfoMethod · 0.80
ErrorMethod · 0.45

Tested by

no test coverage detected