MCPcopy Index your code
hub / github.com/kubernetes-client/python / test_pod_apis

Method test_pod_apis

kubernetes/e2e_test/test_client.py:66–194  ·  view source on GitHub ↗
(self)

Source from the content-addressed store, hash-verified

64 cls.config = base.get_e2e_configuration()
65
66 def test_pod_apis(self):
67 client = api_client.ApiClient(configuration=self.config)
68 api = core_v1_api.CoreV1Api(client)
69
70 name = 'busybox-test-' + short_uuid()
71 pod_manifest = manifest_with_command(
72 name, "while true;do date;sleep 5; done")
73
74 # wait for the default service account to be created
75 timeout = time.time() + 30
76 while True:
77 if time.time() > timeout:
78 print('timeout waiting for default service account creation')
79 break
80 try:
81 resp = api.read_namespaced_service_account(name='default',
82 namespace='default')
83 except ApiException as e:
84 if e.status != HTTPStatus.NOT_FOUND:
85 print('error: %s' % e)
86 self.fail(
87 msg="unexpected error getting default service account")
88 print('default service not found yet: %s' % e)
89 time.sleep(1)
90 continue
91 self.assertEqual('default', resp.metadata.name)
92 break
93
94 resp = api.create_namespaced_pod(body=pod_manifest,
95 namespace='default')
96 self.assertEqual(name, resp.metadata.name)
97 self.assertTrue(resp.status.phase)
98
99 while True:
100 resp = api.read_namespaced_pod(name=name,
101 namespace='default')
102 self.assertEqual(name, resp.metadata.name)
103 self.assertTrue(resp.status.phase)
104 if resp.status.phase != 'Pending':
105 break
106 time.sleep(1)
107
108 exec_command = ['/bin/sh',
109 '-c',
110 'for i in $(seq 1 3); do date; done']
111 resp = stream(api.connect_get_namespaced_pod_exec, name, 'default',
112 command=exec_command,
113 stderr=False, stdin=False,
114 stdout=True, tty=False)
115 print('EXEC response : %s (%s)' % (repr(resp), type(resp)))
116 self.assertIsInstance(resp, str)
117 self.assertEqual(3, len(resp.splitlines()))
118
119 exec_command = ['/bin/sh',
120 '-c',
121 'echo -n "This is a test string" | gzip']
122 resp = stream(api.connect_get_namespaced_pod_exec, name, 'default',
123 command=exec_command,

Callers

nothing calls this directly

Calls 15

create_namespaced_podMethod · 0.95
read_namespaced_podMethod · 0.95
delete_namespaced_podMethod · 0.95
manifest_with_commandFunction · 0.85
write_stdinMethod · 0.80
readline_stdoutMethod · 0.80
peek_stderrMethod · 0.80
readline_stderrMethod · 0.80
peek_stdoutMethod · 0.80
read_channelMethod · 0.80

Tested by

no test coverage detected