MCPcopy
hub / github.com/PeerDB-io/peerdb / DropFlowSource

Method DropFlowSource

flow/activities/flowable.go:821–858  ·  view source on GitHub ↗
(ctx context.Context, req *protos.DropFlowActivityInput)

Source from the content-addressed store, hash-verified

819}
820
821func (a *FlowableActivity) DropFlowSource(ctx context.Context, req *protos.DropFlowActivityInput) error {
822 ctx = context.WithValue(ctx, shared.FlowNameKey, req.FlowJobName)
823 srcConn, srcClose, err := connectors.GetByNameAs[connectors.CDCPullConnectorCore](ctx, nil, a.CatalogPool, req.PeerName)
824 if err != nil {
825 if _, ok := errors.AsType[*exceptions.NotFoundError](err); ok {
826 logger := internal.LoggerFromCtx(ctx)
827 logger.Warn("peer missing, skipping", slog.String("peer", req.PeerName))
828 return nil
829 }
830 if _, ok := errors.AsType[*exceptions.AuthError](err); ok {
831 logger := internal.LoggerFromCtx(ctx)
832 logger.Warn("auth error, skipping to avoid triggering security tools", slog.String("peer", req.PeerName))
833 return nil
834 }
835 return a.Alerter.LogFlowError(ctx, req.FlowJobName,
836 exceptions.NewDropFlowError(fmt.Errorf("[DropFlowSource] failed to get source connector: %w", err)),
837 )
838 }
839 defer srcClose(ctx)
840
841 if err := srcConn.PullFlowCleanup(ctx, req.FlowJobName); err != nil {
842 if dnsErr, ok := errors.AsType[*net.DNSError](err); ok && dnsErr.IsNotFound {
843 a.Alerter.LogFlowWarning(ctx, req.FlowJobName, fmt.Errorf("[DropFlowSource] hostname not found, skipping: %w", err))
844 return nil
845 } else {
846 pullCleanupErr := exceptions.NewDropFlowError(fmt.Errorf("[DropFlowSource] failed to clean up source: %w", err))
847 if !shared.IsSQLStateError(err, pgerrcode.ObjectInUse) {
848 // don't alert when PID active
849 _ = a.Alerter.LogFlowError(ctx, req.FlowJobName, pullCleanupErr)
850 }
851 return pullCleanupErr
852 }
853 }
854
855 a.Alerter.LogFlowInfo(ctx, req.FlowJobName, "Cleaned up source peer replication objects.")
856
857 return nil
858}
859
860func (a *FlowableActivity) DropFlowDestination(ctx context.Context, req *protos.DropFlowActivityInput) error {
861 ctx = context.WithValue(ctx, shared.FlowNameKey, req.FlowJobName)

Callers

nothing calls this directly

Calls 6

WarnMethod · 0.80
LogFlowErrorMethod · 0.80
LogFlowWarningMethod · 0.80
PullFlowCleanupMethod · 0.65
StringMethod · 0.45
LogFlowInfoMethod · 0.45

Tested by

no test coverage detected