( ctx context.Context, connectionConfigs *protos.FlowConnectionConfigsCore, workflowID string, )
| 221 | } |
| 222 | |
| 223 | func (h *FlowRequestHandler) createCDCFlow( |
| 224 | ctx context.Context, connectionConfigs *protos.FlowConnectionConfigsCore, workflowID string, |
| 225 | ) (*protos.CreateCDCFlowResponse, error) { |
| 226 | workflowOptions := client.StartWorkflowOptions{ |
| 227 | ID: workflowID, |
| 228 | TaskQueue: h.peerflowTaskQueueID, |
| 229 | TypedSearchAttributes: shared.NewSearchAttributes(connectionConfigs.FlowJobName), |
| 230 | WorkflowIDConflictPolicy: tEnums.WORKFLOW_ID_CONFLICT_POLICY_USE_EXISTING, // two racing requests end up with the same workflow |
| 231 | WorkflowIDReusePolicy: tEnums.WORKFLOW_ID_REUSE_POLICY_ALLOW_DUPLICATE, // but creating the same id as a completed one is allowed |
| 232 | } |
| 233 | |
| 234 | if err := h.createCdcJobEntry(ctx, connectionConfigs, workflowID, true); err != nil { |
| 235 | slog.ErrorContext(ctx, "unable to create flow job entry", slog.Any("error", err)) |
| 236 | return nil, fmt.Errorf("unable to create flow job entry: %w", err) |
| 237 | } |
| 238 | |
| 239 | if _, err := h.temporalClient.ExecuteWorkflow(ctx, workflowOptions, peerflow.CDCFlowWorkflow, connectionConfigs, nil); err != nil { |
| 240 | slog.ErrorContext(ctx, "unable to start PeerFlow workflow", slog.Any("error", err)) |
| 241 | return nil, fmt.Errorf("unable to start PeerFlow workflow: %w", err) |
| 242 | } |
| 243 | |
| 244 | return &protos.CreateCDCFlowResponse{ |
| 245 | WorkflowId: workflowID, |
| 246 | }, nil |
| 247 | } |
| 248 | |
| 249 | func (h *FlowRequestHandler) CreateQRepFlow( |
| 250 | ctx context.Context, req *protos.CreateQRepFlowRequest, |
no test coverage detected