(runningParallelLabels []string)
| 251 | } |
| 252 | |
| 253 | func dequeuePipeline(runningParallelLabels []string) (pipeline *models.Pipeline, err errors.Error) { |
| 254 | txHelper := dbhelper.NewTxHelper(basicRes, &err) |
| 255 | defer txHelper.End() |
| 256 | tx := txHelper.Begin() |
| 257 | // mysql read lock, not sure if it works for postgresql |
| 258 | errors.Must(tx.LockTables(dal.LockTables{ |
| 259 | {Table: "_devlake_pipelines", Exclusive: false}, |
| 260 | {Table: "_devlake_pipeline_labels", Exclusive: false}, |
| 261 | })) |
| 262 | // prepare query to find an appropriate pipeline to execute |
| 263 | pipeline = &models.Pipeline{} |
| 264 | // 1. find out the current highest priority in the queue |
| 265 | top_priority := 0 |
| 266 | var top_priorities []int |
| 267 | where_status := dal.Where("status IN ?", []string{models.TASK_CREATED, models.TASK_RERUN, models.TASK_RESUME}) |
| 268 | err = tx.Pluck("priority", &top_priorities, dal.From(pipeline), where_status, dal.Orderby("priority DESC"), dal.Limit(1)) |
| 269 | if err != nil { |
| 270 | panic(err) |
| 271 | } |
| 272 | if len(top_priorities) > 0 { |
| 273 | top_priority = top_priorities[0] |
| 274 | } |
| 275 | // 2. pick the earlier runnable pipeline with the highest priority |
| 276 | err = tx.First(pipeline, |
| 277 | where_status, |
| 278 | dal.Where("priority = ?", top_priority), |
| 279 | dal.Join( |
| 280 | `left join _devlake_pipeline_labels ON |
| 281 | _devlake_pipeline_labels.pipeline_id = _devlake_pipelines.id AND |
| 282 | _devlake_pipeline_labels.name LIKE 'parallel/%' AND |
| 283 | _devlake_pipeline_labels.name in ?`, |
| 284 | runningParallelLabels, |
| 285 | ), |
| 286 | dal.Groupby("id"), |
| 287 | dal.Having("count(_devlake_pipeline_labels.name)=0"), |
| 288 | dal.Select("id"), |
| 289 | dal.Orderby("id ASC"), |
| 290 | dal.Limit(1), |
| 291 | ) |
| 292 | if err == nil { |
| 293 | // mark the pipeline running, now we want a write lock |
| 294 | if pipeline.BeganAt == nil { |
| 295 | now := time.Now() |
| 296 | pipeline.BeganAt = &now |
| 297 | globalPipelineLog.Info("resumed pipeline #%d", pipeline.ID) |
| 298 | } |
| 299 | errors.Must(tx.LockTables(dal.LockTables{{Table: "_devlake_pipelines", Exclusive: true}})) |
| 300 | err = tx.UpdateColumns(&models.Pipeline{}, []dal.DalSet{ |
| 301 | {ColumnName: "status", Value: models.TASK_RUNNING}, |
| 302 | {ColumnName: "message", Value: ""}, |
| 303 | {ColumnName: "began_at", Value: pipeline.BeganAt}, |
| 304 | }, dal.Where("id = ?", pipeline.ID)) |
| 305 | if err != nil { |
| 306 | panic(err) |
| 307 | } |
| 308 | |
| 309 | return |
| 310 | } |
no test coverage detected