( ctx context.Context, flowJobName string, deleteStats bool, )
| 299 | } |
| 300 | |
| 301 | func (h *FlowRequestHandler) dropFlow( |
| 302 | ctx context.Context, |
| 303 | flowJobName string, |
| 304 | deleteStats bool, |
| 305 | ) error { |
| 306 | logs := slog.Group("shutdown-log", slog.String(string(shared.FlowNameKey), flowJobName)) |
| 307 | |
| 308 | isCdc, err := h.isCDCFlow(ctx, flowJobName) |
| 309 | if err != nil { |
| 310 | slog.ErrorContext(ctx, "unable to check if workflow is cdc", logs, slog.Any("error", err)) |
| 311 | return fmt.Errorf("unable to determine if workflow is cdc: %w", err) |
| 312 | } |
| 313 | var cdcConfig *protos.FlowConnectionConfigs |
| 314 | if isCdc { |
| 315 | cdcConfig, err = h.getFlowConfigFromCatalog(ctx, flowJobName) |
| 316 | if err != nil { |
| 317 | slog.ErrorContext(ctx, "unable to get cdc config from catalog", logs, slog.Any("error", err)) |
| 318 | return fmt.Errorf("unable to get cdc config from catalog: %w", err) |
| 319 | } |
| 320 | } |
| 321 | |
| 322 | dropFlowWorkflowID := fmt.Sprintf("%s-dropflow-%s", flowJobName, uuid.New()) |
| 323 | workflowOptions := client.StartWorkflowOptions{ |
| 324 | ID: dropFlowWorkflowID, |
| 325 | TaskQueue: h.peerflowTaskQueueID, |
| 326 | TypedSearchAttributes: shared.NewSearchAttributes(flowJobName), |
| 327 | } |
| 328 | |
| 329 | if dropFlowHandle, err := h.temporalClient.ExecuteWorkflow(ctx, workflowOptions, peerflow.DropFlowWorkflow, &protos.DropFlowInput{ |
| 330 | FlowJobName: flowJobName, |
| 331 | DropFlowStats: deleteStats, |
| 332 | FlowConnectionConfigs: pconv.FlowConnectionConfigsToCore(cdcConfig, 0), |
| 333 | SkipDestinationDrop: true, |
| 334 | SkipSourceDrop: true, |
| 335 | }); err != nil { |
| 336 | slog.ErrorContext(ctx, "unable to start DropFlow workflow", logs, slog.Any("error", err)) |
| 337 | return fmt.Errorf("unable to start DropFlow workflow: %w", err) |
| 338 | } else if err := dropFlowHandle.Get(ctx, nil); err != nil { |
| 339 | slog.ErrorContext(ctx, "DropFlow workflow did not execute successfully", logs, slog.Any("error", err)) |
| 340 | return fmt.Errorf("DropFlow workflow did not execute successfully: %w", err) |
| 341 | } |
| 342 | |
| 343 | return nil |
| 344 | } |
| 345 | |
| 346 | func (h *FlowRequestHandler) shutdownFlow( |
| 347 | ctx context.Context, |
no test coverage detected