( ctx context.Context, peer *protos.Peer, flowNames []string, )
| 1008 | } |
| 1009 | |
| 1010 | func (a *FlowableActivity) getRuntimeInfo( |
| 1011 | ctx context.Context, |
| 1012 | peer *protos.Peer, |
| 1013 | flowNames []string, |
| 1014 | ) (string, string, map[string]string) { |
| 1015 | logger := internal.LoggerFromCtx(ctx) |
| 1016 | version := "N/A" |
| 1017 | variant := "N/A" |
| 1018 | replicationMechanismInUseByFlow := make(map[string]string, len(flowNames)) |
| 1019 | |
| 1020 | conn, err := connectors.GetConnector(ctx, nil, peer) |
| 1021 | if err != nil { |
| 1022 | logger.Error("failed to create connector for source peer info", slog.String("peer", peer.Name), slog.Any("error", err)) |
| 1023 | return version, variant, replicationMechanismInUseByFlow |
| 1024 | } |
| 1025 | defer conn.Close() |
| 1026 | |
| 1027 | if vc, ok := conn.(connectors.GetVersionConnector); ok { |
| 1028 | if v, err := vc.GetVersion(ctx); err != nil { |
| 1029 | logger.Error("failed to get version", |
| 1030 | slog.String("peer", peer.Name), slog.Any("error", err)) |
| 1031 | } else { |
| 1032 | version = v |
| 1033 | } |
| 1034 | } |
| 1035 | if dvc, ok := conn.(connectors.DatabaseVariantConnector); ok { |
| 1036 | if v, err := dvc.GetDatabaseVariant(ctx); err != nil { |
| 1037 | logger.Error("failed to get database variant", |
| 1038 | slog.String("peer", peer.Name), slog.Any("error", err)) |
| 1039 | } else { |
| 1040 | variant = v.String() |
| 1041 | } |
| 1042 | } |
| 1043 | if rmc, ok := conn.(connectors.ReplicationMechanismInUseConnector); ok { |
| 1044 | for _, flowName := range flowNames { |
| 1045 | mechanismInUse, err := rmc.GetReplicationMechanismInUse(ctx, flowName) |
| 1046 | if err != nil { |
| 1047 | logger.Error("failed to get replication mechanism in use", |
| 1048 | slog.String("peer", peer.Name), |
| 1049 | slog.String("flowName", flowName), |
| 1050 | slog.Any("error", err)) |
| 1051 | continue |
| 1052 | } |
| 1053 | replicationMechanismInUseByFlow[flowName] = mechanismInUse |
| 1054 | } |
| 1055 | } |
| 1056 | |
| 1057 | return version, variant, replicationMechanismInUseByFlow |
| 1058 | } |
| 1059 | |
| 1060 | type flowInformation struct { |
| 1061 | config *protos.FlowConnectionConfigsCore |
no test coverage detected