MCPcopy Index your code
hub / github.com/flyteorg/flyte / Start

Method Start

flyteplugins/go/tasks/pluginmachinery/workqueue/queue.go:185–251  ·  view source on GitHub ↗
(ctx context.Context)

Source from the content-addressed store, hash-verified

183}
184
185func (q *queue) Start(ctx context.Context) error {
186 q.wlock.Lock()
187 defer q.wlock.Unlock()
188
189 if q.started {
190 return fmt.Errorf("queue already started")
191 }
192
193 for i := 0; i < q.workers; i++ {
194 go func(ctx context.Context) {
195 for {
196 select {
197 case <-ctx.Done():
198 logger.Debug(ctx, "Context cancelled. Shutting down.")
199 return
200 default:
201 item, shutdown := q.queue.Get()
202 if shutdown {
203 logger.Debug(ctx, "Work queue is shutting down.")
204 return
205 }
206
207 wrapperV := item.Clone()
208 wrapper := &wrapperV
209 ws := wrapper.status
210 var err error
211
212 func() {
213 defer func() {
214 if e, ok := recover().(error); ok {
215 logger.Errorf(ctx, "Worker panic'd while processing item [%v]. Error: %v", wrapper.id, e)
216 err = e
217 }
218 }()
219
220 ctxWithFields := contextWithValues(ctx, wrapper.logFields)
221 ws, err = q.processor.Process(ctxWithFields, wrapper.payload)
222 }()
223
224 if err != nil {
225 q.metrics.ProcessorErrors.Inc()
226
227 wrapper.retryCount++
228 wrapper.err = err
229 if wrapper.retryCount >= uint(q.maxRetries) {
230 logger.Debugf(ctx, "WorkItem [%v] exhausted all retries. Last Error: %v.",
231 wrapper.ID(), err)
232 wrapper.status = WorkStatusFailed
233 ws = WorkStatusFailed
234 q.index.Add(wrapper)
235 continue
236 }
237 }
238
239 wrapper.status = ws
240 q.index.Add(wrapper)
241 if !ws.IsTerminal() {
242 q.queue.Add(wrapper)

Callers

nothing calls this directly

Calls 13

DebugFunction · 0.92
ErrorfFunction · 0.92
DebugfFunction · 0.92
WithGoroutineLabelFunction · 0.92
contextWithValuesFunction · 0.85
ErrorfMethod · 0.80
GetMethod · 0.65
ProcessMethod · 0.65
IDMethod · 0.65
AddMethod · 0.65
IsTerminalMethod · 0.65
CloneMethod · 0.45

Tested by

no test coverage detected