SyncUpJobConfigs sync up jobConfigs with aresDB controller
()
| 275 | |
| 276 | // SyncUpJobConfigs sync up jobConfigs with aresDB controller |
| 277 | func (c *Controller) SyncUpJobConfigs() { |
| 278 | c.Lock() |
| 279 | defer c.Unlock() |
| 280 | |
| 281 | // Check if the hash of the assignment is changed or not |
| 282 | updateHash, newAssignmentHash := c.updateAssignmentHash() |
| 283 | if !updateHash { |
| 284 | c.serviceConfig.Scope.Counter("syncUp.skipped").Inc(1) |
| 285 | return |
| 286 | } |
| 287 | |
| 288 | // Get assignment from aresDB controller since hash is changed |
| 289 | assigned, err := c.aresControllerClient.GetAssignment(c.jobNS, c.serviceConfig.Environment.InstanceID) |
| 290 | if err != nil { |
| 291 | c.serviceConfig.Logger.Error("Failed to get assignment from aresDB controller", |
| 292 | zap.String("jobNamespace", c.jobNS), |
| 293 | zap.String("aresDB Controller", c.serviceConfig.ControllerConfig.Address), |
| 294 | zap.Error(err)) |
| 295 | c.serviceConfig.Scope.Counter("syncUp.failed").Inc(1) |
| 296 | return |
| 297 | } |
| 298 | |
| 299 | assignment, err := rules.NewAssignmentFromController(assigned) |
| 300 | if err != nil { |
| 301 | c.serviceConfig.Logger.Error("Failed to populate assignment from controller assignment", |
| 302 | zap.String("jobNamespace", c.jobNS), |
| 303 | zap.String("aresDB Controller", c.serviceConfig.ControllerConfig.Address), |
| 304 | zap.Error(err)) |
| 305 | c.serviceConfig.Scope.Counter("syncUp.failed").Inc(1) |
| 306 | return |
| 307 | } |
| 308 | |
| 309 | c.serviceConfig.Logger.Info("Got assignment from aresDB controller", |
| 310 | zap.String("jobNamespace", c.jobNS), |
| 311 | zap.String("aresDB Controller", c.serviceConfig.ControllerConfig.Address), |
| 312 | zap.String("activeAresNameSpace", config.ActiveAresNameSpace), |
| 313 | zap.Any("aresClusterNSConfig", c.serviceConfig.AresNSConfig), |
| 314 | zap.Any("activeAresClusters", c.serviceConfig.ActiveAresClusters), |
| 315 | zap.Any("assignement", assignment)) |
| 316 | |
| 317 | newJobs := make(map[string]*rules.JobConfig) |
| 318 | // Add or Update jobs |
| 319 | for _, jobConfig := range assignment.Jobs { |
| 320 | newJobs[jobConfig.Name] = jobConfig |
| 321 | if aresClusterDrivers, ok := c.Drivers[jobConfig.Name]; ok { |
| 322 | // case1: existing jobConfig |
| 323 | for aresCluster, driver := range aresClusterDrivers { |
| 324 | if _, ok := assignment.AresClusters[aresCluster]; !ok { |
| 325 | // case1.1: delete the driver because aresCluster is deleted |
| 326 | activeAresCluster, exist := c.serviceConfig.ActiveAresClusters[aresCluster] |
| 327 | if exist && activeAresCluster.GetSinkMode() != config.Sink_Kafka { |
| 328 | c.deleteDriver(driver, aresCluster, aresClusterDrivers) |
| 329 | c.serviceConfig.Logger.Info("deleted driver due to the removed aresCluster", |
| 330 | zap.String("job", jobConfig.Name), |
| 331 | zap.String("aresCluster", aresCluster)) |
| 332 | } |
| 333 | continue |
| 334 | } |
no test coverage detected