Background loop: list then watch, reconnect on errors. A full re-list is only performed when ``self._resource_version`` is ``None`` (first start or after a 410 Gone response). On all other reconnects the most recent ``resourceVersion`` is reused so that no events ar
(self)
| 236 | self._resource_version = rv or "0" |
| 237 | |
| 238 | def _run_loop(self): |
| 239 | """Background loop: list then watch, reconnect on errors. |
| 240 | |
| 241 | A full re-list is only performed when ``self._resource_version`` is |
| 242 | ``None`` (first start or after a 410 Gone response). On all other |
| 243 | reconnects the most recent ``resourceVersion`` is reused so that no |
| 244 | events are missed and the API server does not need to send a full |
| 245 | object snapshot. |
| 246 | """ |
| 247 | while not self._stop_event.is_set(): |
| 248 | # Full re-list only when we have no resource version to resume from. |
| 249 | if self._resource_version is None: |
| 250 | try: |
| 251 | self._initial_list() |
| 252 | except Exception as exc: |
| 253 | logger.exception("Error during initial list; retrying") |
| 254 | self._fire(ERROR, exc) |
| 255 | self._stop_event.wait(timeout=5) |
| 256 | continue |
| 257 | |
| 258 | # Watch loop |
| 259 | last_resync = time.monotonic() |
| 260 | self._watch = Watch() |
| 261 | kw = self._build_kwargs() |
| 262 | kw["resource_version"] = self._resource_version |
| 263 | # When a resync period is configured, set a matching server-side |
| 264 | # watch timeout so that the stream exits after resync_period seconds |
| 265 | # even if no events arrive. Without this, a quiet period longer |
| 266 | # than resync_period would never trigger a resync because the check |
| 267 | # below only runs when the generator yields an event. |
| 268 | if self._resync_period > 0: |
| 269 | kw["timeout_seconds"] = max(1, int(self._resync_period)) |
| 270 | try: |
| 271 | for event in self._watch.stream(self._list_func, **kw): |
| 272 | if self._stop_event.is_set(): |
| 273 | break |
| 274 | evt_type = event.get("type") |
| 275 | obj = event.get("object") |
| 276 | # Sync the most recent resource version from the Watch |
| 277 | # instance (updated by unmarshal_event before yielding). |
| 278 | # Do this before firing handlers so consumers that wake on |
| 279 | # an event immediately see the advanced resource version. |
| 280 | if self._watch is not None and self._watch.resource_version: |
| 281 | self._resource_version = self._watch.resource_version |
| 282 | if evt_type == ADDED: |
| 283 | self._cache._put(obj) |
| 284 | self._fire(ADDED, obj) |
| 285 | elif evt_type == MODIFIED: |
| 286 | self._cache._put(obj) |
| 287 | self._fire(MODIFIED, obj) |
| 288 | elif evt_type == DELETED: |
| 289 | self._cache._remove(obj) |
| 290 | self._fire(DELETED, obj) |
| 291 | elif evt_type == BOOKMARK: |
| 292 | # BOOKMARK events carry an updated resource version but |
| 293 | # no object state change; the Watch instance already |
| 294 | # records the new resource_version internally. |
| 295 | self._fire(BOOKMARK, event.get("raw_object", obj)) |
nothing calls this directly
no test coverage detected