MCPcopy
hub / github.com/apache/devlake / dequeuePipeline

Function dequeuePipeline

backend/server/services/pipeline.go:253–320  ·  view source on GitHub ↗
(runningParallelLabels []string)

Source from the content-addressed store, hash-verified

251}
252
253func dequeuePipeline(runningParallelLabels []string) (pipeline *models.Pipeline, err errors.Error) {
254 txHelper := dbhelper.NewTxHelper(basicRes, &err)
255 defer txHelper.End()
256 tx := txHelper.Begin()
257 // mysql read lock, not sure if it works for postgresql
258 errors.Must(tx.LockTables(dal.LockTables{
259 {Table: "_devlake_pipelines", Exclusive: false},
260 {Table: "_devlake_pipeline_labels", Exclusive: false},
261 }))
262 // prepare query to find an appropriate pipeline to execute
263 pipeline = &models.Pipeline{}
264 // 1. find out the current highest priority in the queue
265 top_priority := 0
266 var top_priorities []int
267 where_status := dal.Where("status IN ?", []string{models.TASK_CREATED, models.TASK_RERUN, models.TASK_RESUME})
268 err = tx.Pluck("priority", &top_priorities, dal.From(pipeline), where_status, dal.Orderby("priority DESC"), dal.Limit(1))
269 if err != nil {
270 panic(err)
271 }
272 if len(top_priorities) > 0 {
273 top_priority = top_priorities[0]
274 }
275 // 2. pick the earlier runnable pipeline with the highest priority
276 err = tx.First(pipeline,
277 where_status,
278 dal.Where("priority = ?", top_priority),
279 dal.Join(
280 `left join _devlake_pipeline_labels ON
281 _devlake_pipeline_labels.pipeline_id = _devlake_pipelines.id AND
282 _devlake_pipeline_labels.name LIKE 'parallel/%' AND
283 _devlake_pipeline_labels.name in ?`,
284 runningParallelLabels,
285 ),
286 dal.Groupby("id"),
287 dal.Having("count(_devlake_pipeline_labels.name)=0"),
288 dal.Select("id"),
289 dal.Orderby("id ASC"),
290 dal.Limit(1),
291 )
292 if err == nil {
293 // mark the pipeline running, now we want a write lock
294 if pipeline.BeganAt == nil {
295 now := time.Now()
296 pipeline.BeganAt = &now
297 globalPipelineLog.Info("resumed pipeline #%d", pipeline.ID)
298 }
299 errors.Must(tx.LockTables(dal.LockTables{{Table: "_devlake_pipelines", Exclusive: true}}))
300 err = tx.UpdateColumns(&models.Pipeline{}, []dal.DalSet{
301 {ColumnName: "status", Value: models.TASK_RUNNING},
302 {ColumnName: "message", Value: ""},
303 {ColumnName: "began_at", Value: pipeline.BeganAt},
304 }, dal.Where("id = ?", pipeline.ID))
305 if err != nil {
306 panic(err)
307 }
308
309 return
310 }

Callers 1

RunPipelineInQueueFunction · 0.85

Calls 10

EndMethod · 0.80
BeginMethod · 0.65
LockTablesMethod · 0.65
PluckMethod · 0.65
FromMethod · 0.65
FirstMethod · 0.65
InfoMethod · 0.65
UpdateColumnsMethod · 0.65
IsErrorNotFoundMethod · 0.65
ErrorMethod · 0.65

Tested by

no test coverage detected