MCPcopy
hub / github.com/kubernetes-sigs/kwok / preprocess

Method preprocess

pkg/kwok/controllers/node_controller.go:276–340  ·  view source on GitHub ↗

preprocess the node and send it to the playStageWorker

(ctx context.Context, node *corev1.Node)

Source from the content-addressed store, hash-verified

274
275// preprocess the node and send it to the playStageWorker
276func (c *NodeController) preprocess(ctx context.Context, node *corev1.Node) error {
277 key := node.Name
278
279 logger := log.FromContext(ctx)
280 logger = logger.With(
281 "node", key,
282 )
283
284 resourceJob, ok := c.delayQueueMapping.Load(key)
285 if ok {
286 if resourceJob.Resource.ResourceVersion == node.ResourceVersion {
287 logger.Debug("Skip node",
288 "reason", "resource version not changed",
289 "stage", resourceJob.Stage.Name(),
290 )
291 return nil
292 }
293 }
294
295 lc := c.lifecycle.Get()
296
297 event := &lifecycle.Event{
298 Labels: node.Labels,
299 Annotations: node.Annotations,
300 Data: node,
301 }
302 stage, err := lc.Match(ctx, event)
303 if err != nil {
304 return fmt.Errorf("stage match: %w", err)
305 }
306 if stage == nil {
307 logger.Debug("Skip node",
308 "reason", "not match any stages",
309 )
310 return nil
311 }
312
313 now := c.clock.Now()
314 delay, _, err := stage.Delay(ctx, event, now)
315 if err != nil {
316 logger.Warn("Failed to get delay",
317 "stage", stage.Name(),
318 "err", err,
319 )
320 }
321 if delay != 0 {
322 stageName := stage.Name()
323 logger.Debug("Delayed play stage",
324 "delay", delay,
325 "stage", stageName,
326 )
327 }
328
329 item := resourceStageJob[*corev1.Node]{
330 Resource: node,
331 Stage: stage,
332 Key: key,
333 RetryCount: new(uint64),

Callers 1

preprocessWorkerMethod · 0.95

Calls 11

addStageJobMethod · 0.95
FromContextFunction · 0.92
WithMethod · 0.80
DebugMethod · 0.80
NowMethod · 0.80
DelayMethod · 0.80
WarnMethod · 0.80
GetMethod · 0.65
LoadMethod · 0.45
NameMethod · 0.45
MatchMethod · 0.45

Tested by

no test coverage detected