MCPcopy Index your code
hub / github.com/kubernetes-client/python / _run_loop

Method _run_loop

kubernetes/informer/informer.py:238–342  ·  view source on GitHub ↗

Background loop: list then watch, reconnect on errors. A full re-list is only performed when ``self._resource_version`` is ``None`` (first start or after a 410 Gone response). On all other reconnects the most recent ``resourceVersion`` is reused so that no events ar

(self)

Source from the content-addressed store, hash-verified

236 self._resource_version = rv or "0"
237
238 def _run_loop(self):
239 """Background loop: list then watch, reconnect on errors.
240
241 A full re-list is only performed when ``self._resource_version`` is
242 ``None`` (first start or after a 410 Gone response). On all other
243 reconnects the most recent ``resourceVersion`` is reused so that no
244 events are missed and the API server does not need to send a full
245 object snapshot.
246 """
247 while not self._stop_event.is_set():
248 # Full re-list only when we have no resource version to resume from.
249 if self._resource_version is None:
250 try:
251 self._initial_list()
252 except Exception as exc:
253 logger.exception("Error during initial list; retrying")
254 self._fire(ERROR, exc)
255 self._stop_event.wait(timeout=5)
256 continue
257
258 # Watch loop
259 last_resync = time.monotonic()
260 self._watch = Watch()
261 kw = self._build_kwargs()
262 kw["resource_version"] = self._resource_version
263 # When a resync period is configured, set a matching server-side
264 # watch timeout so that the stream exits after resync_period seconds
265 # even if no events arrive. Without this, a quiet period longer
266 # than resync_period would never trigger a resync because the check
267 # below only runs when the generator yields an event.
268 if self._resync_period > 0:
269 kw["timeout_seconds"] = max(1, int(self._resync_period))
270 try:
271 for event in self._watch.stream(self._list_func, **kw):
272 if self._stop_event.is_set():
273 break
274 evt_type = event.get("type")
275 obj = event.get("object")
276 # Sync the most recent resource version from the Watch
277 # instance (updated by unmarshal_event before yielding).
278 # Do this before firing handlers so consumers that wake on
279 # an event immediately see the advanced resource version.
280 if self._watch is not None and self._watch.resource_version:
281 self._resource_version = self._watch.resource_version
282 if evt_type == ADDED:
283 self._cache._put(obj)
284 self._fire(ADDED, obj)
285 elif evt_type == MODIFIED:
286 self._cache._put(obj)
287 self._fire(MODIFIED, obj)
288 elif evt_type == DELETED:
289 self._cache._remove(obj)
290 self._fire(DELETED, obj)
291 elif evt_type == BOOKMARK:
292 # BOOKMARK events carry an updated resource version but
293 # no object state change; the Watch instance already
294 # records the new resource_version internally.
295 self._fire(BOOKMARK, event.get("raw_object", obj))

Callers

nothing calls this directly

Calls 10

_initial_listMethod · 0.95
_fireMethod · 0.95
_build_kwargsMethod · 0.95
WatchClass · 0.90
streamMethod · 0.80
_putMethod · 0.80
_removeMethod · 0.80
getMethod · 0.45
warningMethod · 0.45
debugMethod · 0.45

Tested by

no test coverage detected