MCPcopy
hub / github.com/PeerDB-io/peerdb / resyncByRecreatingFlow

Method resyncByRecreatingFlow

flow/cmd/handler.go:663–721  ·  view source on GitHub ↗
(
	ctx context.Context,
	flowName string,
	dropStats bool,
	cdcConfigUpdate *protos.CDCFlowConfigUpdate,
)

Source from the content-addressed store, hash-verified

661}
662
663func (h *FlowRequestHandler) resyncByRecreatingFlow(
664 ctx context.Context,
665 flowName string,
666 dropStats bool,
667 cdcConfigUpdate *protos.CDCFlowConfigUpdate,
668) error {
669 if underMaintenance, err := internal.PeerDBMaintenanceModeEnabled(ctx, nil); err != nil {
670 return fmt.Errorf("unable to get maintenance mode status: %w", err)
671 } else if underMaintenance {
672 return ErrUnderMaintenance
673 }
674
675 isCDC, err := h.isCDCFlow(ctx, flowName)
676 if err != nil {
677 return err
678 }
679 if !isCDC {
680 return fmt.Errorf("resync is only supported for CDC mirrors")
681 }
682 // getting config before dropping the flow since the flow entry is deleted unconditionally
683 config, err := h.getFlowConfigFromCatalog(ctx, flowName)
684 if err != nil {
685 return err
686 }
687 // getting tags before dropping the flow since the flow entry is deleted unconditionally
688 tags, err := alerting.GetTags(ctx, h.pool, flowName)
689 if err != nil {
690 return err
691 }
692
693 config.Resync = true
694 config.DoInitialSnapshot = true
695 // validate mirror first because once the mirror is dropped, there's no going back
696 if internalVersion, err := internal.PeerDBForceInternalVersion(ctx, config.Env); err != nil {
697 return NewInternalApiError(err)
698 } else {
699 config.Version = internalVersion
700 }
701 configCore := pconv.FlowConnectionConfigsToCore(config, 0)
702 internal.ApplySnapshotConfigOverrides(configCore, cdcConfigUpdate)
703
704 if _, err := h.validateCDCMirrorImpl(ctx, configCore, false); err != nil {
705 return err
706 }
707
708 if err := h.shutdownFlow(ctx, flowName, dropStats, false); err != nil {
709 return err
710 }
711
712 workflowID := getWorkflowID(config.FlowJobName)
713 if _, err := h.createCDCFlow(ctx, configCore, workflowID); err != nil {
714 return err
715 }
716
717 if err := alerting.UpdateTags(ctx, h.pool, flowName, tags); err != nil {
718 slog.WarnContext(ctx, "unable to restore tags after resync", slog.Any("error", err))
719 }
720 return nil

Callers 1

FlowStateChangeMethod · 0.95

Calls 7

isCDCFlowMethod · 0.95
validateCDCMirrorImplMethod · 0.95
shutdownFlowMethod · 0.95
createCDCFlowMethod · 0.95
NewInternalApiErrorFunction · 0.85
getWorkflowIDFunction · 0.85

Tested by

no test coverage detected