preprocess the node and send it to the playStageWorker
(ctx context.Context, node *corev1.Node)
| 274 | |
| 275 | // preprocess the node and send it to the playStageWorker |
| 276 | func (c *NodeController) preprocess(ctx context.Context, node *corev1.Node) error { |
| 277 | key := node.Name |
| 278 | |
| 279 | logger := log.FromContext(ctx) |
| 280 | logger = logger.With( |
| 281 | "node", key, |
| 282 | ) |
| 283 | |
| 284 | resourceJob, ok := c.delayQueueMapping.Load(key) |
| 285 | if ok { |
| 286 | if resourceJob.Resource.ResourceVersion == node.ResourceVersion { |
| 287 | logger.Debug("Skip node", |
| 288 | "reason", "resource version not changed", |
| 289 | "stage", resourceJob.Stage.Name(), |
| 290 | ) |
| 291 | return nil |
| 292 | } |
| 293 | } |
| 294 | |
| 295 | lc := c.lifecycle.Get() |
| 296 | |
| 297 | event := &lifecycle.Event{ |
| 298 | Labels: node.Labels, |
| 299 | Annotations: node.Annotations, |
| 300 | Data: node, |
| 301 | } |
| 302 | stage, err := lc.Match(ctx, event) |
| 303 | if err != nil { |
| 304 | return fmt.Errorf("stage match: %w", err) |
| 305 | } |
| 306 | if stage == nil { |
| 307 | logger.Debug("Skip node", |
| 308 | "reason", "not match any stages", |
| 309 | ) |
| 310 | return nil |
| 311 | } |
| 312 | |
| 313 | now := c.clock.Now() |
| 314 | delay, _, err := stage.Delay(ctx, event, now) |
| 315 | if err != nil { |
| 316 | logger.Warn("Failed to get delay", |
| 317 | "stage", stage.Name(), |
| 318 | "err", err, |
| 319 | ) |
| 320 | } |
| 321 | if delay != 0 { |
| 322 | stageName := stage.Name() |
| 323 | logger.Debug("Delayed play stage", |
| 324 | "delay", delay, |
| 325 | "stage", stageName, |
| 326 | ) |
| 327 | } |
| 328 | |
| 329 | item := resourceStageJob[*corev1.Node]{ |
| 330 | Resource: node, |
| 331 | Stage: stage, |
| 332 | Key: key, |
| 333 | RetryCount: new(uint64), |
no test coverage detected