There are w.parallelizm instances of this function running all the time, each one will: - Retrieve an item from the workqueue - For each batch of the keys, call syncCb, which tells us if the items have been updated -- If any has, then overwrite the item in the cache. What happens when the number of
(ctx context.Context)
| 310 | // * Plugin asks for update on item 2 - cache evicts item 1, stores 2 and returns it unchanged |
| 311 | // * Sync loop updates item 2, repeat |
| 312 | func (w *InMemoryAutoRefresh) sync(ctx context.Context) (err error) { |
| 313 | defer func() { |
| 314 | var isErr bool |
| 315 | rVal := recover() |
| 316 | if rVal == nil { |
| 317 | return |
| 318 | } |
| 319 | |
| 320 | if err, isErr = rVal.(error); isErr { |
| 321 | err = fmt.Errorf("worker panic'd and is shutting down. Error: %w with Stack: %v", err, string(debug.Stack())) |
| 322 | } else { |
| 323 | err = fmt.Errorf("worker panic'd and is shutting down. Panic value: %v with Stack: %v", rVal, string(debug.Stack())) |
| 324 | } |
| 325 | |
| 326 | logger.Error(ctx, err) |
| 327 | }() |
| 328 | |
| 329 | for { |
| 330 | select { |
| 331 | case <-ctx.Done(): |
| 332 | return nil |
| 333 | default: |
| 334 | batch, shutdown := w.workqueue.Get() |
| 335 | if shutdown { |
| 336 | logger.Debugf(ctx, "Shutting down worker") |
| 337 | return nil |
| 338 | } |
| 339 | // Since we create batches every time we sync, we will just remove the item from the queue here |
| 340 | // regardless of whether it succeeded the sync or not. |
| 341 | w.workqueue.Forget(batch) |
| 342 | w.workqueue.Done(batch) |
| 343 | |
| 344 | newBatch := make(Batch, 0, len(*batch)) |
| 345 | for _, b := range *batch { |
| 346 | itemID := b.GetID() |
| 347 | w.processing.Delete(itemID) |
| 348 | item, ok := w.lruMap.Get(itemID) |
| 349 | if !ok { |
| 350 | logger.Debugf(ctx, "item with id [%v] not found in cache", itemID) |
| 351 | continue |
| 352 | } |
| 353 | if item.(Item).IsTerminal() { |
| 354 | logger.Debugf(ctx, "item with id [%v] is terminal", itemID) |
| 355 | continue |
| 356 | } |
| 357 | newBatch = append(newBatch, b) |
| 358 | } |
| 359 | if len(newBatch) == 0 { |
| 360 | continue |
| 361 | } |
| 362 | |
| 363 | t := w.metrics.SyncLatency.Start() |
| 364 | updatedBatch, err := w.syncCb(ctx, newBatch) |
| 365 | |
| 366 | if err != nil { |
| 367 | w.metrics.SyncErrors.Inc() |
| 368 | logger.Errorf(ctx, "failed to get latest copy of a batch. Error: %v", err) |
| 369 | t.Stop() |
no test coverage detected