(ctx context.Context, req *protos.DropFlowActivityInput)
| 819 | } |
| 820 | |
| 821 | func (a *FlowableActivity) DropFlowSource(ctx context.Context, req *protos.DropFlowActivityInput) error { |
| 822 | ctx = context.WithValue(ctx, shared.FlowNameKey, req.FlowJobName) |
| 823 | srcConn, srcClose, err := connectors.GetByNameAs[connectors.CDCPullConnectorCore](ctx, nil, a.CatalogPool, req.PeerName) |
| 824 | if err != nil { |
| 825 | if _, ok := errors.AsType[*exceptions.NotFoundError](err); ok { |
| 826 | logger := internal.LoggerFromCtx(ctx) |
| 827 | logger.Warn("peer missing, skipping", slog.String("peer", req.PeerName)) |
| 828 | return nil |
| 829 | } |
| 830 | if _, ok := errors.AsType[*exceptions.AuthError](err); ok { |
| 831 | logger := internal.LoggerFromCtx(ctx) |
| 832 | logger.Warn("auth error, skipping to avoid triggering security tools", slog.String("peer", req.PeerName)) |
| 833 | return nil |
| 834 | } |
| 835 | return a.Alerter.LogFlowError(ctx, req.FlowJobName, |
| 836 | exceptions.NewDropFlowError(fmt.Errorf("[DropFlowSource] failed to get source connector: %w", err)), |
| 837 | ) |
| 838 | } |
| 839 | defer srcClose(ctx) |
| 840 | |
| 841 | if err := srcConn.PullFlowCleanup(ctx, req.FlowJobName); err != nil { |
| 842 | if dnsErr, ok := errors.AsType[*net.DNSError](err); ok && dnsErr.IsNotFound { |
| 843 | a.Alerter.LogFlowWarning(ctx, req.FlowJobName, fmt.Errorf("[DropFlowSource] hostname not found, skipping: %w", err)) |
| 844 | return nil |
| 845 | } else { |
| 846 | pullCleanupErr := exceptions.NewDropFlowError(fmt.Errorf("[DropFlowSource] failed to clean up source: %w", err)) |
| 847 | if !shared.IsSQLStateError(err, pgerrcode.ObjectInUse) { |
| 848 | // don't alert when PID active |
| 849 | _ = a.Alerter.LogFlowError(ctx, req.FlowJobName, pullCleanupErr) |
| 850 | } |
| 851 | return pullCleanupErr |
| 852 | } |
| 853 | } |
| 854 | |
| 855 | a.Alerter.LogFlowInfo(ctx, req.FlowJobName, "Cleaned up source peer replication objects.") |
| 856 | |
| 857 | return nil |
| 858 | } |
| 859 | |
| 860 | func (a *FlowableActivity) DropFlowDestination(ctx context.Context, req *protos.DropFlowActivityInput) error { |
| 861 | ctx = context.WithValue(ctx, shared.FlowNameKey, req.FlowJobName) |
nothing calls this directly
no test coverage detected