ApplyConfig updates the state as the new config requires. Only stop & create queues which have changes.
(conf *config.Config)
| 141 | // ApplyConfig updates the state as the new config requires. |
| 142 | // Only stop & create queues which have changes. |
| 143 | func (rws *WriteStorage) ApplyConfig(conf *config.Config) error { |
| 144 | rws.mtx.Lock() |
| 145 | defer rws.mtx.Unlock() |
| 146 | |
| 147 | // Remote write queues only need to change if the remote write config or |
| 148 | // external labels change. |
| 149 | externalLabelUnchanged := labels.Equal(conf.GlobalConfig.ExternalLabels, rws.externalLabels) |
| 150 | rws.externalLabels = conf.GlobalConfig.ExternalLabels |
| 151 | |
| 152 | newQueues := make(map[string]*QueueManager) |
| 153 | newHashes := []string{} |
| 154 | for _, rwConf := range conf.RemoteWriteConfigs { |
| 155 | hash, err := toHash(rwConf) |
| 156 | if err != nil { |
| 157 | return err |
| 158 | } |
| 159 | |
| 160 | // Don't allow duplicate remote write configs. |
| 161 | if _, ok := newQueues[hash]; ok { |
| 162 | return fmt.Errorf("duplicate remote write configs are not allowed, found duplicate for URL: %s", rwConf.URL) |
| 163 | } |
| 164 | |
| 165 | // Set the queue name to the config hash if the user has not set |
| 166 | // a name in their remote write config so we can still differentiate |
| 167 | // between queues that have the same remote write endpoint. |
| 168 | name := hash[:6] |
| 169 | if rwConf.Name != "" { |
| 170 | name = rwConf.Name |
| 171 | } |
| 172 | |
| 173 | c, err := NewWriteClient(name, &ClientConfig{ |
| 174 | URL: rwConf.URL, |
| 175 | WriteProtoMsg: rwConf.ProtobufMessage, |
| 176 | Timeout: rwConf.RemoteTimeout, |
| 177 | HTTPClientConfig: rwConf.HTTPClientConfig, |
| 178 | SigV4Config: rwConf.SigV4Config, |
| 179 | AzureADConfig: rwConf.AzureADConfig, |
| 180 | GoogleIAMConfig: rwConf.GoogleIAMConfig, |
| 181 | Headers: rwConf.Headers, |
| 182 | RetryOnRateLimit: rwConf.QueueConfig.RetryOnRateLimit, |
| 183 | RoundRobinDNS: rwConf.RoundRobinDNS, |
| 184 | }) |
| 185 | if err != nil { |
| 186 | return err |
| 187 | } |
| 188 | |
| 189 | queue, ok := rws.queues[hash] |
| 190 | if externalLabelUnchanged && ok { |
| 191 | // Update the client in case any secret configuration has changed. |
| 192 | queue.SetClient(c) |
| 193 | newQueues[hash] = queue |
| 194 | delete(rws.queues, hash) |
| 195 | continue |
| 196 | } |
| 197 | |
| 198 | // Redacted to remove any passwords in the URL (that are |
| 199 | // technically accepted but not recommended) since this is |
| 200 | // only used for metric labels. |