| 338 | } |
| 339 | |
| 340 | func (c *Controller) processNextItem() bool { |
| 341 | // Wait until there is a new item in the working queue |
| 342 | resourceHandler, quit := c.queue.Get() |
| 343 | if quit { |
| 344 | return false |
| 345 | } |
| 346 | |
| 347 | c.collectors.SetQueueDepth(c.queue.Len()) |
| 348 | |
| 349 | // Tell the queue that we are done with processing this key. This unblocks the key for other workers |
| 350 | // This allows safe parallel processing because two events with the same key are never processed in |
| 351 | // parallel. |
| 352 | defer c.queue.Done(resourceHandler) |
| 353 | |
| 354 | // Record queue latency if the handler supports it |
| 355 | if h, ok := resourceHandler.(handler.TimedHandler); ok { |
| 356 | queueLatency := time.Since(h.GetEnqueueTime()) |
| 357 | c.collectors.RecordQueueLatency(queueLatency) |
| 358 | } |
| 359 | |
| 360 | // Track reconcile/handler duration |
| 361 | startTime := time.Now() |
| 362 | |
| 363 | // Invoke the method containing the business logic |
| 364 | rh, ok := resourceHandler.(handler.ResourceHandler) |
| 365 | if !ok { |
| 366 | logrus.Errorf("Invalid resource handler type: %T", resourceHandler) |
| 367 | // Clear rate-limiter state so the item doesn't leak memory in the queue. |
| 368 | c.queue.Forget(resourceHandler) |
| 369 | c.collectors.RecordError("invalid_handler_type") |
| 370 | return true |
| 371 | } |
| 372 | err := rh.Handle() |
| 373 | |
| 374 | duration := time.Since(startTime) |
| 375 | |
| 376 | if err != nil { |
| 377 | c.collectors.RecordReconcile("error", duration) |
| 378 | } else { |
| 379 | c.collectors.RecordReconcile("success", duration) |
| 380 | } |
| 381 | |
| 382 | // Handle the error if something went wrong during the execution of the business logic |
| 383 | c.handleErr(err, resourceHandler) |
| 384 | return true |
| 385 | } |
| 386 | |
| 387 | // handleErr checks if an error happened and makes sure we will retry later. |
| 388 | func (c *Controller) handleErr(err error, key interface{}) { |