| 156 | } |
| 157 | |
| 158 | func (h *FlowRequestHandler) CreateCDCFlow( |
| 159 | ctx context.Context, req *protos.CreateCDCFlowRequest, |
| 160 | ) (*protos.CreateCDCFlowResponse, APIError) { |
| 161 | cfg := req.ConnectionConfigs |
| 162 | if cfg == nil { |
| 163 | return nil, NewInvalidArgumentApiError(fmt.Errorf("connection configs cannot be nil")) |
| 164 | } |
| 165 | if internalVersion, err := internal.PeerDBForceInternalVersion(ctx, cfg.Env); err != nil { |
| 166 | return nil, NewInternalApiError(err) |
| 167 | } else { |
| 168 | cfg.Version = internalVersion |
| 169 | } |
| 170 | if flags, err := h.determineFlags(ctx, cfg.Env, cfg.DestinationName); err != nil { |
| 171 | return nil, NewInternalApiError(err) |
| 172 | } else { |
| 173 | cfg.Flags = flags |
| 174 | } |
| 175 | |
| 176 | if !req.AttachToExisting { |
| 177 | if exists, err := h.cdcJobEntryExists(ctx, cfg.FlowJobName); err != nil { |
| 178 | return nil, NewInternalApiError(fmt.Errorf("unable to check flow job entry: %w", err)) |
| 179 | } else if exists { |
| 180 | return nil, NewAlreadyExistsApiError(fmt.Errorf("flow already exists: %s", cfg.FlowJobName)) |
| 181 | } |
| 182 | } |
| 183 | |
| 184 | workflowID := getWorkflowID(cfg.FlowJobName) |
| 185 | desc, err := h.temporalClient.DescribeWorkflow(ctx, workflowID, "") |
| 186 | if _, ok := errors.AsType[*serviceerror.NotFound](err); err != nil && !ok { |
| 187 | return nil, NewInternalApiError(fmt.Errorf("failed to query the workflow execution: %w", err)) |
| 188 | } else if err == nil { |
| 189 | // If workflow is actively running, handle based on AttachToExisting |
| 190 | // Workflows in terminal states are fine |
| 191 | if desc.WorkflowExecutionMetadata.Status == tEnums.WORKFLOW_EXECUTION_STATUS_RUNNING || |
| 192 | desc.WorkflowExecutionMetadata.Status == tEnums.WORKFLOW_EXECUTION_STATUS_CONTINUED_AS_NEW { |
| 193 | if req.AttachToExisting { |
| 194 | // Idempotent attach to running workflow |
| 195 | return &protos.CreateCDCFlowResponse{ |
| 196 | WorkflowId: workflowID, |
| 197 | }, nil |
| 198 | } else { |
| 199 | // Can't create duplicate of running workflow |
| 200 | return nil, NewAlreadyExistsApiError(fmt.Errorf("workflow already exists for flow: %s", cfg.FlowJobName)) |
| 201 | } |
| 202 | } |
| 203 | } |
| 204 | // No running workflow, do the validations and start a new one |
| 205 | |
| 206 | // Use idempotent validation that skips mirror existence check |
| 207 | connectionConfigsCore := pconv.FlowConnectionConfigsToCore(req.ConnectionConfigs, 0) |
| 208 | if connectionConfigsCore.SkipValidation == nil || !*connectionConfigsCore.SkipValidation { |
| 209 | if _, err := h.validateCDCMirrorImpl(ctx, connectionConfigsCore, true); err != nil { |
| 210 | slog.ErrorContext(ctx, "validate mirror error", slog.Any("error", err)) |
| 211 | return nil, NewInternalApiError(fmt.Errorf("invalid mirror: %w", err)) |
| 212 | } |
| 213 | } |
| 214 | |
| 215 | if resp, err := h.createCDCFlow(ctx, connectionConfigsCore, workflowID); err != nil { |