MCPcopy Index your code
hub / github.com/apache/devlake / pipelineServiceInit

Function pipelineServiceInit

backend/server/services/pipeline.go:81–114  ·  view source on GitHub ↗
()

Source from the content-addressed store, hash-verified

79}
80
81func pipelineServiceInit() {
82 // initialize plugin
83 plugin.InitPlugins(basicRes)
84
85 // notification
86 var notificationEndpoint = cfg.GetString("NOTIFICATION_ENDPOINT")
87 var notificationSecret = cfg.GetString("NOTIFICATION_SECRET")
88 if strings.TrimSpace(notificationEndpoint) != "" {
89 defaultNotificationService = NewDefaultPipelineNotificationService(notificationEndpoint, notificationSecret)
90 }
91
92 // standalone mode: reset pipeline status
93 if cfg.GetBool("RESUME_PIPELINES") {
94 markInterruptedPipelineAs(models.TASK_RESUME)
95 } else {
96 markInterruptedPipelineAs(models.TASK_FAILED)
97 }
98
99 // load cronjobs for blueprints
100 errors.Must(ReloadBlueprints())
101
102 var pipelineMaxParallel = cfg.GetInt64("PIPELINE_MAX_PARALLEL")
103 if pipelineMaxParallel < 0 {
104 panic(errors.BadInput.New(`PIPELINE_MAX_PARALLEL should be a positive integer`))
105 }
106 if pipelineMaxParallel == 0 {
107 globalPipelineLog.Warn(nil, `pipelineMaxParallel=0 means pipeline will be run No Limit`)
108 pipelineMaxParallel = 10000
109 }
110 // run pipeline with independent goroutine
111 if cfg.GetBool("CONSUME_PIPELINES") {
112 go RunPipelineInQueue(pipelineMaxParallel)
113 }
114}
115
116func markInterruptedPipelineAs(status string) {
117 errors.Must(db.UpdateColumns(

Callers 1

ExecuteMigrationFunction · 0.85

Calls 9

ReloadBlueprintsFunction · 0.85
RunPipelineInQueueFunction · 0.85
GetStringMethod · 0.80
GetBoolMethod · 0.80
GetInt64Method · 0.80
NewMethod · 0.65
WarnMethod · 0.65

Tested by

no test coverage detected