MCPcopy Index your code
hub / github.com/kubernetes-client/python / Watch

Class Watch

kubernetes/base/watch/watch.py:79–245  ·  view source on GitHub ↗

Source from the content-addressed store, hash-verified

77
78
79class Watch:
80
81 def __init__(self, return_type=None):
82 self._raw_return_type = return_type
83 self._stop = False
84 self._api_client = client.ApiClient()
85 self.resource_version = None
86
87 def stop(self):
88 self._stop = True
89 if hasattr(self, '_resp') and self._resp:
90 import socket
91 try:
92 # Python SSL/socket GIL Workaround: Force-shutdown the raw socket under HTTP/1.1
93 # to immediately unblock the background thread blocked in CPython's ssl.read() recv_into
94 # call. This avoids deadlock where close() hangs waiting for SSL socket locks held by
95 # the blocked read call. The actual response/connection closing is handled in the finally
96 # block when the stream loop exits.
97 conn = getattr(self._resp, 'connection', None)
98 sock = getattr(conn, 'sock', None) if conn else None
99 if sock:
100 sock.shutdown(socket.SHUT_RDWR)
101 except Exception:
102 pass
103
104
105 def get_return_type(self, func):
106 if self._raw_return_type:
107 return self._raw_return_type
108 return_type = _find_return_type(func)
109 if return_type.endswith(TYPE_LIST_SUFFIX):
110 return return_type[:-len(TYPE_LIST_SUFFIX)]
111 return return_type
112
113 def get_watch_argument_name(self, func):
114 if PYDOC_FOLLOW_PARAM in pydoc.getdoc(func):
115 return 'follow'
116 else:
117 return 'watch'
118
119 def unmarshal_event(self, data, return_type):
120 if not data or data.isspace():
121 return None
122 try:
123 js = json.loads(data)
124 js['raw_object'] = js['object']
125
126 if not return_type:
127 return js
128
129 if js['type'] == 'BOOKMARK':
130 # Extract and store resource_version from BOOKMARK event for
131 # efficiency. No deserialization as event can be incomplete.
132 if isinstance(js['object'], dict) and 'metadata' in js['object']:
133 metadata = js['object']['metadata']
134 if isinstance(metadata, dict) and 'resourceVersion' in metadata:
135 self.resource_version = metadata['resourceVersion']
136 elif js['type'] != 'ERROR':

Calls

no outgoing calls