MCPcopy Index your code
hub / github.com/kubernetes-client/python / stream

Method stream

kubernetes/base/watch/watch.py:151–245  ·  view source on GitHub ↗

Watch an API resource and stream the result back via a generator. Note that watching an API resource can expire. The method tries to resume automatically once from the last result, but if that last result is too old as well, an `ApiException` exception will be thrown with

(self, func, *args, **kwargs)

Source from the content-addressed store, hash-verified

149 return None
150
151 def stream(self, func, *args, **kwargs):
152 """Watch an API resource and stream the result back via a generator.
153
154 Note that watching an API resource can expire. The method tries to
155 resume automatically once from the last result, but if that last result
156 is too old as well, an `ApiException` exception will be thrown with
157 ``code`` 410. In that case you have to recover yourself, probably
158 by listing the API resource to obtain the latest state and then
159 watching from that state on by setting ``resource_version`` to
160 one returned from listing.
161
162 :param func: The API function pointer. Any parameter to the function
163 can be passed after this parameter.
164
165 :return: Event object with these keys:
166 'type': The type of event such as "ADDED", "DELETED", etc.
167 'raw_object': a dict representing the watched object.
168 'object': A model representation of raw_object. The name of
169 model will be determined based on
170 the func's doc string. If it cannot be determined,
171 'object' value will be the same as 'raw_object'.
172
173 Example:
174 v1 = kubernetes.client.CoreV1Api()
175 watch = kubernetes.watch.Watch()
176 for e in watch.stream(v1.list_namespace, resource_version=1127):
177 type_ = e['type']
178 object_ = e['object'] # object is one of type return_type
179 raw_object = e['raw_object'] # raw_object is a dict
180 ...
181 if should_stop:
182 watch.stop()
183 """
184
185 self._stop = False
186 return_type = self.get_return_type(func)
187 watch_arg = self.get_watch_argument_name(func)
188 kwargs[watch_arg] = True
189 kwargs['_preload_content'] = False
190 if 'resource_version' in kwargs:
191 self.resource_version = kwargs['resource_version']
192
193 # Do not attempt retries if user specifies a timeout.
194 # We want to ensure we are returning within that timeout.
195 disable_retries = ('timeout_seconds' in kwargs)
196 retry_after_410 = False
197 deserialize = kwargs.pop('deserialize', True)
198 while True:
199 resp = func(*args, **kwargs)
200 self._resp = resp
201 try:
202 for line in iter_resp_lines(resp):
203 # unmarshal when we are receiving events from watch,
204 # return raw string when we are streaming log
205 if watch_arg == "watch":
206 if deserialize:
207 event = self.unmarshal_event(line, return_type)
208 else:

Calls 5

get_return_typeMethod · 0.95
unmarshal_eventMethod · 0.95
iter_resp_linesFunction · 0.85
closeMethod · 0.45