MCPcopy
hub / github.com/uber/aresdb / SyncUpJobConfigs

Method SyncUpJobConfigs

subscriber/common/job/controller.go:277–424  ·  view source on GitHub ↗

SyncUpJobConfigs sync up jobConfigs with aresDB controller

()

Source from the content-addressed store, hash-verified

275
276// SyncUpJobConfigs sync up jobConfigs with aresDB controller
277func (c *Controller) SyncUpJobConfigs() {
278 c.Lock()
279 defer c.Unlock()
280
281 // Check if the hash of the assignment is changed or not
282 updateHash, newAssignmentHash := c.updateAssignmentHash()
283 if !updateHash {
284 c.serviceConfig.Scope.Counter("syncUp.skipped").Inc(1)
285 return
286 }
287
288 // Get assignment from aresDB controller since hash is changed
289 assigned, err := c.aresControllerClient.GetAssignment(c.jobNS, c.serviceConfig.Environment.InstanceID)
290 if err != nil {
291 c.serviceConfig.Logger.Error("Failed to get assignment from aresDB controller",
292 zap.String("jobNamespace", c.jobNS),
293 zap.String("aresDB Controller", c.serviceConfig.ControllerConfig.Address),
294 zap.Error(err))
295 c.serviceConfig.Scope.Counter("syncUp.failed").Inc(1)
296 return
297 }
298
299 assignment, err := rules.NewAssignmentFromController(assigned)
300 if err != nil {
301 c.serviceConfig.Logger.Error("Failed to populate assignment from controller assignment",
302 zap.String("jobNamespace", c.jobNS),
303 zap.String("aresDB Controller", c.serviceConfig.ControllerConfig.Address),
304 zap.Error(err))
305 c.serviceConfig.Scope.Counter("syncUp.failed").Inc(1)
306 return
307 }
308
309 c.serviceConfig.Logger.Info("Got assignment from aresDB controller",
310 zap.String("jobNamespace", c.jobNS),
311 zap.String("aresDB Controller", c.serviceConfig.ControllerConfig.Address),
312 zap.String("activeAresNameSpace", config.ActiveAresNameSpace),
313 zap.Any("aresClusterNSConfig", c.serviceConfig.AresNSConfig),
314 zap.Any("activeAresClusters", c.serviceConfig.ActiveAresClusters),
315 zap.Any("assignement", assignment))
316
317 newJobs := make(map[string]*rules.JobConfig)
318 // Add or Update jobs
319 for _, jobConfig := range assignment.Jobs {
320 newJobs[jobConfig.Name] = jobConfig
321 if aresClusterDrivers, ok := c.Drivers[jobConfig.Name]; ok {
322 // case1: existing jobConfig
323 for aresCluster, driver := range aresClusterDrivers {
324 if _, ok := assignment.AresClusters[aresCluster]; !ok {
325 // case1.1: delete the driver because aresCluster is deleted
326 activeAresCluster, exist := c.serviceConfig.ActiveAresClusters[aresCluster]
327 if exist && activeAresCluster.GetSinkMode() != config.Sink_Kafka {
328 c.deleteDriver(driver, aresCluster, aresClusterDrivers)
329 c.serviceConfig.Logger.Info("deleted driver due to the removed aresCluster",
330 zap.String("job", jobConfig.Name),
331 zap.String("aresCluster", aresCluster))
332 }
333 continue
334 }

Callers 3

NewControllerFunction · 0.95
StartControllerFunction · 0.80
controller_test.goFile · 0.80

Calls 11

updateAssignmentHashMethod · 0.95
deleteDriverMethod · 0.95
addDriverMethod · 0.95
GetSinkModeMethod · 0.80
GetAssignmentMethod · 0.65
ErrorMethod · 0.65
StringMethod · 0.65
InfoMethod · 0.65
LockMethod · 0.45
UnlockMethod · 0.45

Tested by

no test coverage detected