MCPcopy Index your code
hub / github.com/kubernetes-client/python / _websocket_request

Function _websocket_request

kubernetes/base/stream/stream.py:20–46  ·  view source on GitHub ↗

Override the ApiClient.request method with an alternative websocket based method and call the supplied Kubernetes API method with that in place.

(websocket_request, force_kwargs, api_method, *args, **kwargs)

Source from the content-addressed store, hash-verified

18
19
20def _websocket_request(websocket_request, force_kwargs, api_method, *args, **kwargs):
21 """Override the ApiClient.request method with an alternative websocket based
22 method and call the supplied Kubernetes API method with that in place."""
23 if force_kwargs:
24 for kwarg, value in force_kwargs.items():
25 kwargs[kwarg] = value
26 api_client = api_method.__self__.api_client
27 # old generated code's api client has config. new ones has configuration
28 try:
29 configuration = api_client.configuration
30 except AttributeError:
31 configuration = api_client.config
32 prev_request = api_client.request
33 binary = kwargs.pop('binary', False)
34 try:
35 api_client.request = functools.partial(websocket_request, configuration, binary=binary)
36 out = api_method(*args, **kwargs)
37 # The api_client insists on converting this to a string using its representation, so we have
38 # to do this dance to strip it of the b' prefix and ' suffix, encode it byte-per-byte (latin1),
39 # escape all of the unicode \x*'s, then encode it back byte-by-byte
40 # However, if _preload_content=False is passed, then the entire WSClient is returned instead
41 # of a response, and we want to leave it alone
42 if binary and kwargs.get('_preload_content', True):
43 out = out[2:-1].encode('latin1').decode('unicode_escape').encode('latin1')
44 return out
45 finally:
46 api_client.request = prev_request
47
48
49stream = functools.partial(_websocket_request, ws_client.websocket_call, None)

Callers

nothing calls this directly

Calls 2

itemsMethod · 0.45
getMethod · 0.45

Tested by

no test coverage detected