( ctx context.Context, flowJobName string, deleteStats bool, skipDestinationDrop bool, )
| 344 | } |
| 345 | |
| 346 | func (h *FlowRequestHandler) shutdownFlow( |
| 347 | ctx context.Context, |
| 348 | flowJobName string, |
| 349 | deleteStats bool, |
| 350 | skipDestinationDrop bool, |
| 351 | ) error { |
| 352 | workflowID, err := h.getWorkflowID(ctx, flowJobName) |
| 353 | if err != nil { |
| 354 | return err |
| 355 | } |
| 356 | |
| 357 | logs := slog.Group("shutdown-log", |
| 358 | slog.String(string(shared.FlowNameKey), flowJobName), |
| 359 | slog.String("workflowId", workflowID), |
| 360 | ) |
| 361 | |
| 362 | if err := h.handleCancelWorkflow(ctx, workflowID, ""); err != nil { |
| 363 | slog.ErrorContext(ctx, "unable to cancel workflow", logs, slog.Any("error", err)) |
| 364 | return fmt.Errorf("unable to wait for PeerFlow workflow to close: %w", err) |
| 365 | } |
| 366 | |
| 367 | isCdc, err := h.isCDCFlow(ctx, flowJobName) |
| 368 | if err != nil { |
| 369 | slog.ErrorContext(ctx, "unable to check if workflow is cdc", logs, slog.Any("error", err)) |
| 370 | return fmt.Errorf("unable to determine if workflow is cdc: %w", err) |
| 371 | } |
| 372 | var cdcConfig *protos.FlowConnectionConfigs |
| 373 | if isCdc { |
| 374 | cdcConfig, err = h.getFlowConfigFromCatalog(ctx, flowJobName) |
| 375 | if err != nil { |
| 376 | slog.ErrorContext(ctx, "unable to get cdc config from catalog", logs, slog.Any("error", err)) |
| 377 | return fmt.Errorf("unable to get cdc config from catalog: %w", err) |
| 378 | } |
| 379 | } |
| 380 | dropFlowWorkflowID := fmt.Sprintf("%s-dropflow-%s", flowJobName, uuid.New()) |
| 381 | workflowOptions := client.StartWorkflowOptions{ |
| 382 | ID: dropFlowWorkflowID, |
| 383 | TaskQueue: h.peerflowTaskQueueID, |
| 384 | TypedSearchAttributes: shared.NewSearchAttributes(flowJobName), |
| 385 | } |
| 386 | |
| 387 | dropFlowHandle, err := h.temporalClient.ExecuteWorkflow(ctx, workflowOptions, peerflow.DropFlowWorkflow, &protos.DropFlowInput{ |
| 388 | FlowJobName: flowJobName, |
| 389 | DropFlowStats: deleteStats, |
| 390 | FlowConnectionConfigs: pconv.FlowConnectionConfigsToCore(cdcConfig, 0), |
| 391 | SkipDestinationDrop: skipDestinationDrop, |
| 392 | // NOTE: Resync is false here during snapshot-only resync |
| 393 | }) |
| 394 | if err != nil { |
| 395 | slog.ErrorContext(ctx, "unable to start DropFlow workflow", logs, slog.Any("error", err)) |
| 396 | return fmt.Errorf("unable to start DropFlow workflow: %w", err) |
| 397 | } |
| 398 | |
| 399 | cancelCtx, cancel := context.WithTimeout(ctx, 5*time.Minute) |
| 400 | defer cancel() |
| 401 | |
| 402 | errLatch := concurrency.NewLatch[error]() |
| 403 | go func() { |
no test coverage detected