| 247 | } |
| 248 | |
| 249 | func (h *FlowRequestHandler) CreateQRepFlow( |
| 250 | ctx context.Context, req *protos.CreateQRepFlowRequest, |
| 251 | ) (*protos.CreateQRepFlowResponse, APIError) { |
| 252 | cfg := req.QrepConfig |
| 253 | if internalVersion, err := internal.PeerDBForceInternalVersion(ctx, cfg.Env); err != nil { |
| 254 | return nil, NewInternalApiError(err) |
| 255 | } else { |
| 256 | cfg.Version = internalVersion |
| 257 | } |
| 258 | if flags, err := h.determineFlags(ctx, cfg.Env, cfg.DestinationName); err != nil { |
| 259 | return nil, NewInternalApiError(err) |
| 260 | } else { |
| 261 | cfg.Flags = flags |
| 262 | } |
| 263 | |
| 264 | workflowID := fmt.Sprintf("%s-qrepflow-%s", cfg.FlowJobName, uuid.New()) |
| 265 | workflowOptions := client.StartWorkflowOptions{ |
| 266 | ID: workflowID, |
| 267 | TaskQueue: h.peerflowTaskQueueID, |
| 268 | TypedSearchAttributes: shared.NewSearchAttributes(cfg.FlowJobName), |
| 269 | } |
| 270 | if err := h.createQRepJobEntry(ctx, req, workflowID); err != nil { |
| 271 | slog.ErrorContext(ctx, "unable to create flow job entry", |
| 272 | slog.Any("error", err), slog.String("flowName", cfg.FlowJobName)) |
| 273 | return nil, NewInternalApiError(fmt.Errorf("unable to create flow job entry: %w", err)) |
| 274 | } |
| 275 | dbtype, err := connectors.LoadPeerType(ctx, h.pool, cfg.SourceName) |
| 276 | if err != nil { |
| 277 | return nil, NewInternalApiError(err) |
| 278 | } |
| 279 | var workflowFn any |
| 280 | if dbtype == protos.DBType_POSTGRES && cfg.WatermarkColumn == "xmin" { |
| 281 | workflowFn = peerflow.XminFlowWorkflow |
| 282 | } else { |
| 283 | workflowFn = peerflow.QRepFlowWorkflow |
| 284 | } |
| 285 | |
| 286 | cfg.ParentMirrorName = cfg.FlowJobName |
| 287 | |
| 288 | if _, err := h.temporalClient.ExecuteWorkflow(ctx, workflowOptions, workflowFn, cfg, nil); err != nil { |
| 289 | slog.ErrorContext(ctx, "unable to start QRepFlow workflow", |
| 290 | slog.Any("error", err), slog.String("flowName", cfg.FlowJobName)) |
| 291 | return nil, NewInternalApiError(fmt.Errorf("unable to start QRepFlow workflow: %w", err)) |
| 292 | } |
| 293 | |
| 294 | telemetry.LogActivityCreateFlow(ctx, cfg.FlowJobName) |
| 295 | |
| 296 | return &protos.CreateQRepFlowResponse{ |
| 297 | WorkflowId: workflowID, |
| 298 | }, nil |
| 299 | } |
| 300 | |
| 301 | func (h *FlowRequestHandler) dropFlow( |
| 302 | ctx context.Context, |