GetTopicsConfigs fetches all topic config options for the given set of topic names and config names and converts that information so that it is handy to use. Provide an empty array for configNames to describe all config entries.
(ctx context.Context, topicNames []string, configNames []string)
| 147 | // GetTopicsConfigs fetches all topic config options for the given set of topic names and config names and converts |
| 148 | // that information so that it is handy to use. Provide an empty array for configNames to describe all config entries. |
| 149 | func (s *Service) GetTopicsConfigs(ctx context.Context, topicNames []string, configNames []string) (map[string]*TopicConfig, error) { |
| 150 | cl, _, err := s.kafkaClientFactory.GetKafkaClient(ctx) |
| 151 | if err != nil { |
| 152 | return nil, err |
| 153 | } |
| 154 | |
| 155 | resources := make([]kmsg.DescribeConfigsRequestResource, len(topicNames)) |
| 156 | for i, topicName := range topicNames { |
| 157 | r := kmsg.DescribeConfigsRequestResource{ |
| 158 | ResourceType: kmsg.ConfigResourceTypeTopic, |
| 159 | ResourceName: topicName, |
| 160 | ConfigNames: configNames, |
| 161 | } |
| 162 | resources[i] = r |
| 163 | } |
| 164 | |
| 165 | req := kmsg.NewDescribeConfigsRequest() |
| 166 | req.Resources = resources |
| 167 | req.IncludeDocumentation = true |
| 168 | req.IncludeSynonyms = true |
| 169 | |
| 170 | response, err := req.RequestWith(ctx, cl) |
| 171 | if err != nil { |
| 172 | return nil, err |
| 173 | } |
| 174 | |
| 175 | // 3. Iterate through response's config entries and convert them into our desired format |
| 176 | converted := make(map[string]*TopicConfig, len(topicNames)) |
| 177 | for _, res := range response.Resources { |
| 178 | kafkaErr := newKafkaError(res.ErrorCode) |
| 179 | if kafkaErr != nil { |
| 180 | s.logger.WarnContext(ctx, "config resource response has an error", slog.String("resource_name", res.ResourceName), slog.Any("error", kafkaErr)) |
| 181 | } |
| 182 | |
| 183 | entries := make([]*TopicConfigEntry, len(res.Configs)) |
| 184 | for i, cfg := range res.Configs { |
| 185 | // Merge config extension into entry |
| 186 | extension := s.configExtensionsByName[cfg.Name] |
| 187 | entries[i] = NewTopicConfigEntry(cfg, extension) |
| 188 | } |
| 189 | |
| 190 | converted[res.ResourceName] = &TopicConfig{ |
| 191 | TopicName: res.ResourceName, |
| 192 | ConfigEntries: entries, |
| 193 | Error: kafkaErr, |
| 194 | } |
| 195 | } |
| 196 | |
| 197 | return converted, nil |
| 198 | } |
no test coverage detected