| 472 | } |
| 473 | |
| 474 | func (c *Controller) startDriver( |
| 475 | jobConfig *rules.JobConfig, aresCluster string, aresClusterDrivers map[string]*Driver, stop bool) bool { |
| 476 | // 0. Clone jobConfig |
| 477 | clonedJobConfig, err := rules.CloneJobConfig(jobConfig, c.serviceConfig, aresCluster) |
| 478 | if err != nil { |
| 479 | c.serviceConfig.Logger.Error("Failed to copy job config", |
| 480 | zap.String("job", jobConfig.Name), |
| 481 | zap.String("aresCluster", aresCluster), |
| 482 | zap.Error(err)) |
| 483 | return false |
| 484 | } |
| 485 | |
| 486 | // 1. Stop the job driver |
| 487 | if stop { |
| 488 | aresClusterDrivers[aresCluster].Stop() |
| 489 | aresClusterDrivers[aresCluster] = nil |
| 490 | } |
| 491 | |
| 492 | // 2. create a new driver |
| 493 | driver, err := |
| 494 | NewDriver(clonedJobConfig, c.serviceConfig, c.aresControllerClient, NewStreamingProcessor, c.sinkInitFunc, c.consumerInitFunc, c.decoderInitFunc) |
| 495 | if err != nil { |
| 496 | c.serviceConfig.Logger.Error("Failed to create driver", |
| 497 | zap.String("job", jobConfig.Name), |
| 498 | zap.String("cluster", aresCluster), |
| 499 | zap.Error(err)) |
| 500 | return false |
| 501 | } |
| 502 | |
| 503 | // 3. Start the job driver |
| 504 | go driver.Start() |
| 505 | aresClusterDrivers[aresCluster] = driver |
| 506 | |
| 507 | return true |
| 508 | } |
| 509 | |
| 510 | func (c *Controller) startEtcdHBService(params Params) { |
| 511 | var err error |