(ctx context.Context, gvksToRelist []schema.GroupVersionKind, stopCh <-chan struct{})
| 480 | } |
| 481 | |
| 482 | func (c *CacheManager) replayGVKs(ctx context.Context, gvksToRelist []schema.GroupVersionKind, stopCh <-chan struct{}) { |
| 483 | gvksSet := watch.NewSet() |
| 484 | gvksSet.Add(gvksToRelist...) |
| 485 | |
| 486 | for gvksSet.Size() != 0 { |
| 487 | gvkItems := gvksSet.Items() |
| 488 | |
| 489 | for _, gvk := range gvkItems { |
| 490 | select { |
| 491 | case <-ctx.Done(): |
| 492 | return |
| 493 | case <-stopCh: |
| 494 | return |
| 495 | default: |
| 496 | operation := func(ctx context.Context) (bool, error) { |
| 497 | select { |
| 498 | // make sure that the stop channel hasn't closed yet in order to stop |
| 499 | // the operation in the backoff retry-er earlier so we don't sync GVKs |
| 500 | // that we may not want to sync anymore. This also ensures that we exit |
| 501 | // the func as soon as possible. |
| 502 | case <-stopCh: |
| 503 | return true, nil |
| 504 | default: |
| 505 | if err := c.syncGVK(ctx, gvk); err != nil { |
| 506 | return false, err |
| 507 | } |
| 508 | return true, nil |
| 509 | } |
| 510 | } |
| 511 | |
| 512 | if err := wait.ExponentialBackoffWithContext(ctx, backoff, operation); err != nil { |
| 513 | log.Error(err, "internal: error listings gvk cache data", "gvk", gvk) |
| 514 | } else { |
| 515 | gvksSet.Remove(gvk) |
| 516 | } |
| 517 | } |
| 518 | } |
| 519 | |
| 520 | c.ReportSyncMetrics() |
| 521 | } |
| 522 | } |
| 523 | |
| 524 | // wipeCacheIfNeeded performs a cache wipe if there are any gvks needing to be removed |
| 525 | // from the cache or if the excluder has changed. It also marks which gvks need to be |
no test coverage detected