MCPcopy
hub / github.com/apache/devlake / RunPipeline

Function RunPipeline

backend/core/runner/run_pipeline.go:31–55  ·  view source on GitHub ↗

RunPipeline FIXME ...

(
	basicRes context.BasicRes,
	pipelineId uint64,
	runTasks func([]uint64) errors.Error,
)

Source from the content-addressed store, hash-verified

29
30// RunPipeline FIXME ...
31func RunPipeline(
32 basicRes context.BasicRes,
33 pipelineId uint64,
34 runTasks func([]uint64) errors.Error,
35) errors.Error {
36 // load tasks for pipeline
37 db := basicRes.GetDal()
38 var tasks []models.Task
39 err := db.All(
40 &tasks,
41 dal.Where("pipeline_id = ? AND status in ?", pipelineId, []string{models.TASK_CREATED, models.TASK_RERUN, models.TASK_RESUME}),
42 dal.Orderby("pipeline_row, pipeline_col"),
43 )
44 if err != nil {
45 return err
46 }
47 taskIds := make([][]uint64, 0)
48 for _, task := range tasks {
49 for len(taskIds) < task.PipelineRow {
50 taskIds = append(taskIds, make([]uint64, 0))
51 }
52 taskIds[task.PipelineRow-1] = append(taskIds[task.PipelineRow-1], task.ID)
53 }
54 return runPipelineTasks(basicRes, pipelineId, taskIds, runTasks)
55}
56
57func runPipelineTasks(
58 basicRes context.BasicRes,

Callers

nothing calls this directly

Calls 3

runPipelineTasksFunction · 0.85
GetDalMethod · 0.65
AllMethod · 0.65

Tested by

no test coverage detected