( ctx workflow.Context, logger log.Logger, cfg *protos.FlowConnectionConfigsCore, state *cdc_state.CDCFlowWorkflowState, mirrorNameSearch temporal.SearchAttributes, )
| 92 | } |
| 93 | |
| 94 | func processCDCFlowConfigUpdate( |
| 95 | ctx workflow.Context, |
| 96 | logger log.Logger, |
| 97 | cfg *protos.FlowConnectionConfigsCore, |
| 98 | state *cdc_state.CDCFlowWorkflowState, |
| 99 | mirrorNameSearch temporal.SearchAttributes, |
| 100 | ) error { |
| 101 | flowConfigUpdate := state.FlowConfigUpdate |
| 102 | |
| 103 | // Capture old values for logging before updates are applied |
| 104 | oldValues := telemetry.OldCDCFlowValues{ |
| 105 | BatchSize: state.SyncFlowOptions.BatchSize, |
| 106 | IdleTimeout: state.SyncFlowOptions.IdleTimeoutSeconds, |
| 107 | SnapshotNumRowsPerPartition: state.SnapshotNumRowsPerPartition, |
| 108 | SnapshotNumPartitionsOverride: state.SnapshotNumPartitionsOverride, |
| 109 | SnapshotMaxParallelWorkers: state.SnapshotMaxParallelWorkers, |
| 110 | SnapshotNumTablesInParallel: state.SnapshotNumTablesInParallel, |
| 111 | } |
| 112 | if len(flowConfigUpdate.UpdatedEnv) > 0 { |
| 113 | oldValues.Env = make(map[string]string, len(flowConfigUpdate.UpdatedEnv)) |
| 114 | if cfg.Env != nil { |
| 115 | for key := range flowConfigUpdate.UpdatedEnv { |
| 116 | oldValues.Env[key] = cfg.Env[key] |
| 117 | } |
| 118 | } |
| 119 | } |
| 120 | |
| 121 | if flowConfigUpdate.BatchSize > 0 { |
| 122 | state.SyncFlowOptions.BatchSize = flowConfigUpdate.BatchSize |
| 123 | } |
| 124 | if flowConfigUpdate.IdleTimeout > 0 { |
| 125 | state.SyncFlowOptions.IdleTimeoutSeconds = flowConfigUpdate.IdleTimeout |
| 126 | } |
| 127 | if flowConfigUpdate.UpdatedEnv != nil { |
| 128 | if cfg.Env == nil { |
| 129 | cfg.Env = make(map[string]string, len(flowConfigUpdate.UpdatedEnv)) |
| 130 | } |
| 131 | maps.Copy(cfg.Env, flowConfigUpdate.UpdatedEnv) |
| 132 | } |
| 133 | if flowConfigUpdate.SnapshotNumRowsPerPartition > 0 { |
| 134 | state.SnapshotNumRowsPerPartition = flowConfigUpdate.SnapshotNumRowsPerPartition |
| 135 | } |
| 136 | if flowConfigUpdate.SnapshotNumPartitionsOverride > 0 { |
| 137 | state.SnapshotNumPartitionsOverride = flowConfigUpdate.SnapshotNumPartitionsOverride |
| 138 | } |
| 139 | if flowConfigUpdate.SnapshotMaxParallelWorkers > 0 { |
| 140 | state.SnapshotMaxParallelWorkers = flowConfigUpdate.SnapshotMaxParallelWorkers |
| 141 | } |
| 142 | if flowConfigUpdate.SnapshotNumTablesInParallel > 0 { |
| 143 | state.SnapshotNumTablesInParallel = flowConfigUpdate.SnapshotNumTablesInParallel |
| 144 | } |
| 145 | |
| 146 | tablesAreAdded := len(flowConfigUpdate.AdditionalTables) > 0 |
| 147 | tablesAreRemoved := len(flowConfigUpdate.RemovedTables) > 0 |
| 148 | if tablesAreAdded || tablesAreRemoved { |
| 149 | logger.Info("processing CDCFlowConfigUpdate", slog.Any("updatedState", flowConfigUpdate)) |
| 150 | |
| 151 | if tablesAreAdded { |
no test coverage detected