(self)
| 69 | return outputs |
| 70 | |
| 71 | def test_log_server(self): |
| 72 | master_port = self.port |
| 73 | # start the master |
| 74 | master = Master(port=master_port) |
| 75 | th = threading.Thread(target=master.run) |
| 76 | th.start() |
| 77 | time.sleep(10) |
| 78 | |
| 79 | self.add_worker(n_cpu=4) |
| 80 | |
| 81 | cluster_addr = 'localhost:{}'.format(self.port) |
| 82 | outputs = self._connect_and_create_actor(cluster_addr) |
| 83 | |
| 84 | # Get status |
| 85 | status = master._get_status() |
| 86 | client_jobs = pickle.loads(status).get('client_jobs') |
| 87 | self.assertIsNotNone(client_jobs) |
| 88 | |
| 89 | # Get job id |
| 90 | client = get_global_client() |
| 91 | jobs = client_jobs.get(client.client_id) |
| 92 | self.assertIsNotNone(jobs) |
| 93 | |
| 94 | for job_id, log_server_addr in jobs.items(): |
| 95 | log_url = "http://{}/get-log".format(log_server_addr) |
| 96 | # Test response without job_id |
| 97 | r = requests.get(log_url) |
| 98 | self.assertEqual(r.status_code, 400) |
| 99 | # Test normal response |
| 100 | r = requests.get(log_url, params={'job_id': job_id}) |
| 101 | self.assertEqual(r.status_code, 200) |
| 102 | log_content = json.loads(r.text).get('log') |
| 103 | self.assertIsNotNone(log_content) |
| 104 | log_content = log_content.replace('\r\n', '\n') |
| 105 | self.assertIn(log_content, outputs) |
| 106 | |
| 107 | # Test download |
| 108 | download_url = "http://{}/download-log".format(log_server_addr) |
| 109 | r = requests.get(download_url, params={'job_id': job_id}) |
| 110 | self.assertEqual(r.status_code, 200) |
| 111 | log_content = r.text.replace('\r\n', '\n') |
| 112 | self.assertIn(log_content, outputs) |
| 113 | master.exit() |
| 114 | |
| 115 | def test_monitor_query_log_server(self): |
| 116 | master_port = self.port |
nothing calls this directly
no test coverage detected