| 77 | |
| 78 | |
| 79 | class Watch: |
| 80 | |
| 81 | def __init__(self, return_type=None): |
| 82 | self._raw_return_type = return_type |
| 83 | self._stop = False |
| 84 | self._api_client = client.ApiClient() |
| 85 | self.resource_version = None |
| 86 | |
| 87 | def stop(self): |
| 88 | self._stop = True |
| 89 | if hasattr(self, '_resp') and self._resp: |
| 90 | import socket |
| 91 | try: |
| 92 | # Python SSL/socket GIL Workaround: Force-shutdown the raw socket under HTTP/1.1 |
| 93 | # to immediately unblock the background thread blocked in CPython's ssl.read() recv_into |
| 94 | # call. This avoids deadlock where close() hangs waiting for SSL socket locks held by |
| 95 | # the blocked read call. The actual response/connection closing is handled in the finally |
| 96 | # block when the stream loop exits. |
| 97 | conn = getattr(self._resp, 'connection', None) |
| 98 | sock = getattr(conn, 'sock', None) if conn else None |
| 99 | if sock: |
| 100 | sock.shutdown(socket.SHUT_RDWR) |
| 101 | except Exception: |
| 102 | pass |
| 103 | |
| 104 | |
| 105 | def get_return_type(self, func): |
| 106 | if self._raw_return_type: |
| 107 | return self._raw_return_type |
| 108 | return_type = _find_return_type(func) |
| 109 | if return_type.endswith(TYPE_LIST_SUFFIX): |
| 110 | return return_type[:-len(TYPE_LIST_SUFFIX)] |
| 111 | return return_type |
| 112 | |
| 113 | def get_watch_argument_name(self, func): |
| 114 | if PYDOC_FOLLOW_PARAM in pydoc.getdoc(func): |
| 115 | return 'follow' |
| 116 | else: |
| 117 | return 'watch' |
| 118 | |
| 119 | def unmarshal_event(self, data, return_type): |
| 120 | if not data or data.isspace(): |
| 121 | return None |
| 122 | try: |
| 123 | js = json.loads(data) |
| 124 | js['raw_object'] = js['object'] |
| 125 | |
| 126 | if not return_type: |
| 127 | return js |
| 128 | |
| 129 | if js['type'] == 'BOOKMARK': |
| 130 | # Extract and store resource_version from BOOKMARK event for |
| 131 | # efficiency. No deserialization as event can be incomplete. |
| 132 | if isinstance(js['object'], dict) and 'metadata' in js['object']: |
| 133 | metadata = js['object']['metadata'] |
| 134 | if isinstance(metadata, dict) and 'resourceVersion' in metadata: |
| 135 | self.resource_version = metadata['resourceVersion'] |
| 136 | elif js['type'] != 'ERROR': |
no outgoing calls