MCPcopy
hub / github.com/apache/devlake / runPipelineTasks

Function runPipelineTasks

backend/core/runner/run_pipeline.go:57–105  ·  view source on GitHub ↗
(
	basicRes context.BasicRes,
	pipelineId uint64,
	taskIds [][]uint64,
	runTasks func([]uint64) errors.Error,
)

Source from the content-addressed store, hash-verified

55}
56
57func runPipelineTasks(
58 basicRes context.BasicRes,
59 pipelineId uint64,
60 taskIds [][]uint64,
61 runTasks func([]uint64) errors.Error,
62) errors.Error {
63 db := basicRes.GetDal()
64 log := basicRes.GetLogger()
65 // load pipeline from db
66 dbPipeline := &models.Pipeline{}
67 err := db.First(dbPipeline, dal.Where("id = ?", pipelineId))
68 if err != nil {
69 return err
70 }
71
72 // if pipeline has been cancelled, just return.
73 if dbPipeline.Status == models.TASK_CANCELLED {
74 return nil
75 }
76
77 // This double for loop executes each set of tasks sequentially while
78 // executing the set of tasks concurrently.
79 for i, row := range taskIds {
80 // update stage
81 err = db.UpdateColumns(dbPipeline, []dal.DalSet{
82 {ColumnName: "status", Value: models.TASK_RUNNING},
83 {ColumnName: "stage", Value: i + 1},
84 })
85 if err != nil {
86 log.Error(err, "update pipeline state failed")
87 break
88 }
89 // run tasks in parallel
90 err = runTasks(row)
91 if err != nil {
92 log.Error(err, "run tasks failed")
93 if errors.Is(err, gocontext.Canceled) || !dbPipeline.SkipOnFail {
94 log.Info("return error")
95 return err
96 }
97 }
98 }
99 if dbPipeline.BeganAt != nil {
100 log.Info("pipeline finished in %d ms: %v", time.Now().UnixMilli()-dbPipeline.BeganAt.UnixMilli(), err)
101 } else {
102 log.Info("pipeline finished at %d ms: %v", time.Now().UnixMilli(), err)
103 }
104 return err
105}

Callers 1

RunPipelineFunction · 0.85

Calls 6

GetDalMethod · 0.65
GetLoggerMethod · 0.65
FirstMethod · 0.65
UpdateColumnsMethod · 0.65
ErrorMethod · 0.65
InfoMethod · 0.65

Tested by

no test coverage detected