MCPcopy Index your code
hub / github.com/kubernetes-client/python / portforward_commands

Function portforward_commands

examples/pod_portforward.py:60–186  ·  view source on GitHub ↗
(api_instance)

Source from the content-addressed store, hash-verified

58
59
60def portforward_commands(api_instance):
61 name = 'portforward-example'
62 resp = None
63 try:
64 resp = api_instance.read_namespaced_pod(name=name,
65 namespace='default')
66 except ApiException as e:
67 if e.status != 404:
68 print(f"Unknown error: {e}")
69 exit(1)
70
71 if not resp:
72 print(f"Pod {name} does not exist. Creating it...")
73 pod_manifest = {
74 'apiVersion': 'v1',
75 'kind': 'Pod',
76 'metadata': {
77 'name': name
78 },
79 'spec': {
80 'containers': [{
81 'image': 'nginx',
82 'name': 'nginx',
83 }]
84 }
85 }
86 api_instance.create_namespaced_pod(body=pod_manifest,
87 namespace='default')
88 while True:
89 resp = api_instance.read_namespaced_pod(name=name,
90 namespace='default')
91 if resp.status.phase != 'Pending':
92 break
93 time.sleep(1)
94 print("Done.")
95
96 pf = portforward(
97 api_instance.connect_get_namespaced_pod_portforward,
98 name, 'default',
99 ports='80',
100 )
101 http = pf.socket(80)
102 http.setblocking(True)
103 http.sendall(b'GET / HTTP/1.1\r\n')
104 http.sendall(b'Host: 127.0.0.1\r\n')
105 http.sendall(b'Connection: close\r\n')
106 http.sendall(b'Accept: */*\r\n')
107 http.sendall(b'\r\n')
108 response = b''
109 while True:
110 select.select([http], [], [])
111 data = http.recv(1024)
112 if not data:
113 break
114 response += data
115 http.close()
116 print(response.decode('utf-8'))
117 error = pf.error(80)

Callers 1

mainFunction · 0.85

Calls 6

socketMethod · 0.80
read_namespaced_podMethod · 0.45
create_namespaced_podMethod · 0.45
sleepMethod · 0.45
closeMethod · 0.45
errorMethod · 0.45

Tested by

no test coverage detected