MCPcopy Index your code
hub / github.com/kubernetes-client/python / WSClient

Class WSClient

kubernetes/base/stream/ws_client.py:51–296  ·  view source on GitHub ↗

Source from the content-addressed store, hash-verified

49
50
51class WSClient:
52 def __init__(self, configuration, url, headers, capture_all, binary=False):
53 """A websocket client with support for channels.
54
55 Exec command uses different channels for different streams. for
56 example, 0 is stdin, 1 is stdout and 2 is stderr. Some other API calls
57 like port forwarding can forward different pods' streams to different
58 channels.
59 """
60 self._connected = False
61 self._channels = {}
62 self._closed_channels = set()
63 self.subprotocol = None
64 self.binary = binary
65 self.newline = '\n' if not self.binary else b'\n'
66 if capture_all:
67 self._all = StringIO() if not self.binary else BytesIO()
68 else:
69 self._all = _IgnoredIO()
70 self.sock = create_websocket(configuration, url, headers)
71 self.subprotocol = getattr(self.sock, 'subprotocol', None)
72 if not self.subprotocol and self.sock:
73 headers_dict = self.sock.getheaders()
74 if headers_dict:
75 for k, v in headers_dict.items():
76 if k.lower() == 'sec-websocket-protocol':
77 self.subprotocol = v
78 break
79 self._connected = True
80 self._returncode = None
81
82 def peek_channel(self, channel, timeout=0):
83 """Peek a channel and return part of the input,
84 empty string otherwise."""
85 if channel in self._closed_channels and channel not in self._channels:
86 return b"" if self.binary else ""
87 self.update(timeout=timeout)
88 if channel in self._channels:
89 return self._channels[channel]
90 return b"" if self.binary else ""
91
92 def read_channel(self, channel, timeout=0):
93 """Read data from a channel."""
94 if channel in self._closed_channels and channel not in self._channels:
95 return b"" if self.binary else ""
96 if channel not in self._channels:
97 ret = self.peek_channel(channel, timeout)
98 else:
99 ret = self._channels[channel]
100 if channel in self._channels:
101 del self._channels[channel]
102 return ret
103
104 def readline_channel(self, channel, timeout=None):
105 """Read a line from a channel."""
106 if timeout is None:
107 timeout = float("inf")
108 start = time.time()

Calls

no outgoing calls