(self)
| 64 | cls.config = base.get_e2e_configuration() |
| 65 | |
| 66 | def test_pod_apis(self): |
| 67 | client = api_client.ApiClient(configuration=self.config) |
| 68 | api = core_v1_api.CoreV1Api(client) |
| 69 | |
| 70 | name = 'busybox-test-' + short_uuid() |
| 71 | pod_manifest = manifest_with_command( |
| 72 | name, "while true;do date;sleep 5; done") |
| 73 | |
| 74 | # wait for the default service account to be created |
| 75 | timeout = time.time() + 30 |
| 76 | while True: |
| 77 | if time.time() > timeout: |
| 78 | print('timeout waiting for default service account creation') |
| 79 | break |
| 80 | try: |
| 81 | resp = api.read_namespaced_service_account(name='default', |
| 82 | namespace='default') |
| 83 | except ApiException as e: |
| 84 | if e.status != HTTPStatus.NOT_FOUND: |
| 85 | print('error: %s' % e) |
| 86 | self.fail( |
| 87 | msg="unexpected error getting default service account") |
| 88 | print('default service not found yet: %s' % e) |
| 89 | time.sleep(1) |
| 90 | continue |
| 91 | self.assertEqual('default', resp.metadata.name) |
| 92 | break |
| 93 | |
| 94 | resp = api.create_namespaced_pod(body=pod_manifest, |
| 95 | namespace='default') |
| 96 | self.assertEqual(name, resp.metadata.name) |
| 97 | self.assertTrue(resp.status.phase) |
| 98 | |
| 99 | while True: |
| 100 | resp = api.read_namespaced_pod(name=name, |
| 101 | namespace='default') |
| 102 | self.assertEqual(name, resp.metadata.name) |
| 103 | self.assertTrue(resp.status.phase) |
| 104 | if resp.status.phase != 'Pending': |
| 105 | break |
| 106 | time.sleep(1) |
| 107 | |
| 108 | exec_command = ['/bin/sh', |
| 109 | '-c', |
| 110 | 'for i in $(seq 1 3); do date; done'] |
| 111 | resp = stream(api.connect_get_namespaced_pod_exec, name, 'default', |
| 112 | command=exec_command, |
| 113 | stderr=False, stdin=False, |
| 114 | stdout=True, tty=False) |
| 115 | print('EXEC response : %s (%s)' % (repr(resp), type(resp))) |
| 116 | self.assertIsInstance(resp, str) |
| 117 | self.assertEqual(3, len(resp.splitlines())) |
| 118 | |
| 119 | exec_command = ['/bin/sh', |
| 120 | '-c', |
| 121 | 'echo -n "This is a test string" | gzip'] |
| 122 | resp = stream(api.connect_get_namespaced_pod_exec, name, 'default', |
| 123 | command=exec_command, |
nothing calls this directly
no test coverage detected