GetBrokerConfig retrieves a specifc broker's configurations.
(ctx context.Context, brokerID int32)
| 94 | |
| 95 | // GetBrokerConfig retrieves a specifc broker's configurations. |
| 96 | func (s *Service) GetBrokerConfig(ctx context.Context, brokerID int32) ([]BrokerConfigEntry, *rest.Error) { |
| 97 | cl, _, err := s.kafkaClientFactory.GetKafkaClient(ctx) |
| 98 | if err != nil { |
| 99 | return nil, errorToRestError(err) |
| 100 | } |
| 101 | |
| 102 | resourceReq := kmsg.NewDescribeConfigsRequestResource() |
| 103 | resourceReq.ResourceType = kmsg.ConfigResourceTypeBroker |
| 104 | resourceReq.ResourceName = strconv.Itoa(int(brokerID)) // Empty string for all brokers (only works for dynamic broker configs) |
| 105 | resourceReq.ConfigNames = nil // Nil requests all |
| 106 | |
| 107 | req := kmsg.NewDescribeConfigsRequest() |
| 108 | req.Resources = []kmsg.DescribeConfigsRequestResource{ |
| 109 | resourceReq, |
| 110 | } |
| 111 | req.IncludeSynonyms = true |
| 112 | req.IncludeDocumentation = true |
| 113 | |
| 114 | res, err := req.RequestWith(ctx, cl) |
| 115 | if err != nil { |
| 116 | return nil, &rest.Error{ |
| 117 | Err: fmt.Errorf("failed to request broker config: %w", err), |
| 118 | Status: http.StatusServiceUnavailable, |
| 119 | Message: fmt.Sprintf("Failed to request broker's config: %v", err.Error()), |
| 120 | IsSilent: false, |
| 121 | } |
| 122 | } |
| 123 | |
| 124 | // Resources should always be of length = 1 |
| 125 | for _, resource := range res.Resources { |
| 126 | err := newKafkaErrorWithDynamicMessage(resource.ErrorCode, resource.ErrorMessage) |
| 127 | if err != nil { |
| 128 | return nil, &rest.Error{ |
| 129 | Err: err, |
| 130 | Status: http.StatusServiceUnavailable, |
| 131 | Message: fmt.Sprintf("Failed to describe broker config resource: %v", err.Error()), |
| 132 | IsSilent: false, |
| 133 | } |
| 134 | } |
| 135 | |
| 136 | configEntries := make([]BrokerConfigEntry, len(resource.Configs)) |
| 137 | for j, cfg := range resource.Configs { |
| 138 | isDefaultValue := false |
| 139 | innerEntries := make([]BrokerConfigSynonym, len(cfg.ConfigSynonyms)) |
| 140 | for j, innerCfg := range cfg.ConfigSynonyms { |
| 141 | innerEntries[j] = BrokerConfigSynonym{ |
| 142 | Name: innerCfg.Name, |
| 143 | Value: innerCfg.Value, |
| 144 | Source: innerCfg.Source.String(), |
| 145 | } |
| 146 | if innerCfg.Source == kmsg.ConfigSourceDefaultConfig { |
| 147 | isDefaultValue = derefString(cfg.Value) == derefString(innerCfg.Value) |
| 148 | } |
| 149 | } |
| 150 | |
| 151 | var isExplicitlySet bool |
| 152 | if cfg.Source == kmsg.ConfigSourceUnknown { |
| 153 | // Kafka <v1.1 uses the IsDefault property. Since then it's been replaced by ConfigSource and defaults |
no test coverage detected