( ctx context.Context, flowName string, dropStats bool, cdcConfigUpdate *protos.CDCFlowConfigUpdate, )
| 661 | } |
| 662 | |
| 663 | func (h *FlowRequestHandler) resyncByRecreatingFlow( |
| 664 | ctx context.Context, |
| 665 | flowName string, |
| 666 | dropStats bool, |
| 667 | cdcConfigUpdate *protos.CDCFlowConfigUpdate, |
| 668 | ) error { |
| 669 | if underMaintenance, err := internal.PeerDBMaintenanceModeEnabled(ctx, nil); err != nil { |
| 670 | return fmt.Errorf("unable to get maintenance mode status: %w", err) |
| 671 | } else if underMaintenance { |
| 672 | return ErrUnderMaintenance |
| 673 | } |
| 674 | |
| 675 | isCDC, err := h.isCDCFlow(ctx, flowName) |
| 676 | if err != nil { |
| 677 | return err |
| 678 | } |
| 679 | if !isCDC { |
| 680 | return fmt.Errorf("resync is only supported for CDC mirrors") |
| 681 | } |
| 682 | // getting config before dropping the flow since the flow entry is deleted unconditionally |
| 683 | config, err := h.getFlowConfigFromCatalog(ctx, flowName) |
| 684 | if err != nil { |
| 685 | return err |
| 686 | } |
| 687 | // getting tags before dropping the flow since the flow entry is deleted unconditionally |
| 688 | tags, err := alerting.GetTags(ctx, h.pool, flowName) |
| 689 | if err != nil { |
| 690 | return err |
| 691 | } |
| 692 | |
| 693 | config.Resync = true |
| 694 | config.DoInitialSnapshot = true |
| 695 | // validate mirror first because once the mirror is dropped, there's no going back |
| 696 | if internalVersion, err := internal.PeerDBForceInternalVersion(ctx, config.Env); err != nil { |
| 697 | return NewInternalApiError(err) |
| 698 | } else { |
| 699 | config.Version = internalVersion |
| 700 | } |
| 701 | configCore := pconv.FlowConnectionConfigsToCore(config, 0) |
| 702 | internal.ApplySnapshotConfigOverrides(configCore, cdcConfigUpdate) |
| 703 | |
| 704 | if _, err := h.validateCDCMirrorImpl(ctx, configCore, false); err != nil { |
| 705 | return err |
| 706 | } |
| 707 | |
| 708 | if err := h.shutdownFlow(ctx, flowName, dropStats, false); err != nil { |
| 709 | return err |
| 710 | } |
| 711 | |
| 712 | workflowID := getWorkflowID(config.FlowJobName) |
| 713 | if _, err := h.createCDCFlow(ctx, configCore, workflowID); err != nil { |
| 714 | return err |
| 715 | } |
| 716 | |
| 717 | if err := alerting.UpdateTags(ctx, h.pool, flowName, tags); err != nil { |
| 718 | slog.WarnContext(ctx, "unable to restore tags after resync", slog.Any("error", err)) |
| 719 | } |
| 720 | return nil |
no test coverage detected