()
| 79 | } |
| 80 | |
| 81 | func pipelineServiceInit() { |
| 82 | // initialize plugin |
| 83 | plugin.InitPlugins(basicRes) |
| 84 | |
| 85 | // notification |
| 86 | var notificationEndpoint = cfg.GetString("NOTIFICATION_ENDPOINT") |
| 87 | var notificationSecret = cfg.GetString("NOTIFICATION_SECRET") |
| 88 | if strings.TrimSpace(notificationEndpoint) != "" { |
| 89 | defaultNotificationService = NewDefaultPipelineNotificationService(notificationEndpoint, notificationSecret) |
| 90 | } |
| 91 | |
| 92 | // standalone mode: reset pipeline status |
| 93 | if cfg.GetBool("RESUME_PIPELINES") { |
| 94 | markInterruptedPipelineAs(models.TASK_RESUME) |
| 95 | } else { |
| 96 | markInterruptedPipelineAs(models.TASK_FAILED) |
| 97 | } |
| 98 | |
| 99 | // load cronjobs for blueprints |
| 100 | errors.Must(ReloadBlueprints()) |
| 101 | |
| 102 | var pipelineMaxParallel = cfg.GetInt64("PIPELINE_MAX_PARALLEL") |
| 103 | if pipelineMaxParallel < 0 { |
| 104 | panic(errors.BadInput.New(`PIPELINE_MAX_PARALLEL should be a positive integer`)) |
| 105 | } |
| 106 | if pipelineMaxParallel == 0 { |
| 107 | globalPipelineLog.Warn(nil, `pipelineMaxParallel=0 means pipeline will be run No Limit`) |
| 108 | pipelineMaxParallel = 10000 |
| 109 | } |
| 110 | // run pipeline with independent goroutine |
| 111 | if cfg.GetBool("CONSUME_PIPELINES") { |
| 112 | go RunPipelineInQueue(pipelineMaxParallel) |
| 113 | } |
| 114 | } |
| 115 | |
| 116 | func markInterruptedPipelineAs(status string) { |
| 117 | errors.Must(db.UpdateColumns( |
no test coverage detected