replaceWatchSet looks at the gvksToSync and makes changes to the registrar's watch set. Assumes caller has lock. On error, actual watch state may not align with intended watch state.
(ctx context.Context)
| 175 | // replaceWatchSet looks at the gvksToSync and makes changes to the registrar's watch set. |
| 176 | // Assumes caller has lock. On error, actual watch state may not align with intended watch state. |
| 177 | func (c *CacheManager) replaceWatchSet(ctx context.Context) error { |
| 178 | newWatchSet := watch.SetFrom(c.gvksToSync.GVKs()) |
| 179 | |
| 180 | gvksToRemove := c.watchedSet.Difference(newWatchSet) |
| 181 | c.gvksToDeleteFromCache.AddSet(gvksToRemove) |
| 182 | |
| 183 | var err error |
| 184 | c.watchedSet.Replace(newWatchSet, func() { |
| 185 | // *Note the following steps are not transactional with respect to admission control |
| 186 | |
| 187 | // Important: dynamic watches update must happen *after* updating our watchSet. |
| 188 | // Otherwise, the sync controller will drop events for the newly watched kinds. |
| 189 | err = c.registrar.ReplaceWatch(ctx, newWatchSet.Items()) |
| 190 | }) |
| 191 | |
| 192 | if err != nil { |
| 193 | // account for any watches failing to remove |
| 194 | if f := watch.NewErrorList(); errors.As(err, &f) && !f.HasGeneralErr() { |
| 195 | removeGVKFailures := watch.SetFrom(f.RemoveGVKFailures()) |
| 196 | finallyRemoved := c.danglingWatches.Difference(removeGVKFailures) |
| 197 | |
| 198 | c.gvksToDeleteFromCache.AddSet(finallyRemoved) |
| 199 | c.danglingWatches.RemoveSet(finallyRemoved) |
| 200 | c.danglingWatches.AddSet(removeGVKFailures) |
| 201 | } else { |
| 202 | // defensively assume all watches that needed removal failed to be removed in the general error case |
| 203 | // also assume whatever watches were dangling are still dangling. |
| 204 | c.danglingWatches.AddSet(gvksToRemove) |
| 205 | } |
| 206 | |
| 207 | return err |
| 208 | } |
| 209 | |
| 210 | // if no error, it means no previously dangling watches are still dangling |
| 211 | c.gvksToDeleteFromCache.AddSet(c.danglingWatches) |
| 212 | c.danglingWatches = watch.NewSet() |
| 213 | |
| 214 | return nil |
| 215 | } |
| 216 | |
| 217 | // interpretErr determines if the passed-in error is general (not GVK-specific) and, |
| 218 | // if GVK-specific, returns the subset of the passed in GVKs that are included in the err. |