MCPcopy
hub / github.com/apache/devlake / RunPipelineInQueue

Function RunPipelineInQueue

backend/server/services/pipeline.go:323–377  ·  view source on GitHub ↗

RunPipelineInQueue query pipeline from db and run it in a queue

(pipelineMaxParallel int64)

Source from the content-addressed store, hash-verified

321
322// RunPipelineInQueue query pipeline from db and run it in a queue
323func RunPipelineInQueue(pipelineMaxParallel int64) {
324 sema := semaphore.NewWeighted(pipelineMaxParallel)
325 runningParallelLabels := []string{}
326 var runningParallelLabelLock sync.Mutex
327 var err error
328 for {
329 // start goroutine when sema lock ready and pipeline exist.
330 // to avoid read old pipeline, acquire lock before read exist pipeline
331 errors.Must(sema.Acquire(context.TODO(), 1))
332 globalPipelineLog.Info("get lock and wait next pipeline")
333 var dbPipeline *models.Pipeline
334 for {
335 dbPipeline, err = dequeuePipeline(runningParallelLabels)
336 if err == nil && dbPipeline != nil {
337 break
338 }
339 time.Sleep(time.Second)
340 }
341
342 err = fillPipelineDetail(dbPipeline)
343 if err != nil {
344 panic(err)
345 }
346 // add pipelineParallelLabels to runningParallelLabels
347 var pipelineParallelLabels []string
348 for _, dbLabel := range dbPipeline.Labels {
349 if strings.HasPrefix(dbLabel, `parallel/`) {
350 pipelineParallelLabels = append(pipelineParallelLabels, dbLabel)
351 }
352 }
353 runningParallelLabelLock.Lock()
354 runningParallelLabels = append(runningParallelLabels, pipelineParallelLabels...)
355 runningParallelLabelLock.Unlock()
356
357 go func(pipelineId uint64, parallelLabels []string) {
358 defer sema.Release(1)
359 defer func() {
360 runningParallelLabelLock.Lock()
361 runningParallelLabels = utils.SliceRemove(runningParallelLabels, parallelLabels...)
362 runningParallelLabelLock.Unlock()
363 globalPipelineLog.Info("finish pipeline #%d, now runningParallelLabels is %s", pipelineId, runningParallelLabels)
364 }()
365 globalPipelineLog.Info("run pipeline, %d, now running runningParallelLabels are %s", pipelineId, runningParallelLabels)
366 // Notify that the pipeline has started
367 err = NotifyExternal(pipelineId)
368 if err != nil {
369 globalPipelineLog.Error(err, "failed to send pipeline started notification for pipeline #%d", pipelineId)
370 }
371 err = runPipeline(pipelineId)
372 if err != nil {
373 globalPipelineLog.Error(err, "failed to run pipeline %d", pipelineId)
374 }
375 }(dbPipeline.ID, pipelineParallelLabels)
376 }
377}
378
379func getProjectName(pipeline *models.Pipeline) (string, errors.Error) {
380 if pipeline == nil {

Callers 1

pipelineServiceInitFunction · 0.85

Calls 7

dequeuePipelineFunction · 0.85
fillPipelineDetailFunction · 0.85
NotifyExternalFunction · 0.85
runPipelineFunction · 0.85
InfoMethod · 0.65
ReleaseMethod · 0.65
ErrorMethod · 0.65

Tested by

no test coverage detected