执行单个本地测试任务
(self)
| 37 | self._origin_os_env = dict(os.environ) |
| 38 | |
| 39 | def run(self): |
| 40 | """执行单个本地测试任务 |
| 41 | """ |
| 42 | task_dir, task_id = TaskDirCtl().acquire_task_dir() |
| 43 | logger.debug('created task_dir: %s', task_dir) |
| 44 | task_log = os.path.join(task_dir, 'task.log') |
| 45 | task_request_file = os.path.join(task_dir, 'task_request.json') |
| 46 | task_response_file = os.path.join(task_dir, 'task_response.json') |
| 47 | with open(self._request_file, 'r') as fp: |
| 48 | request = json.load(fp) |
| 49 | request['task_id'] = task_id |
| 50 | request['task_dir'] = task_dir |
| 51 | request['task_params']['task_scene'] = TaskScene.TEST |
| 52 | with open(task_request_file, 'w') as fp: |
| 53 | json.dump(request, fp, indent=2) |
| 54 | |
| 55 | # 启动任务 |
| 56 | task_name = request['task_name'] |
| 57 | task = Task(task_id, task_name, task_request_file, task_response_file, task_log, env=self._origin_os_env) |
| 58 | task.start() |
| 59 | self._running_task.append(task) |
| 60 | |
| 61 | # 等待任务执行完成 |
| 62 | while self._running_task: |
| 63 | time.sleep(3) |
| 64 | self._handle_exist_task() |
| 65 | |
| 66 | # 删除临时文件夹 |
| 67 | tmp_issue_dir = os.path.join(task_dir, "workdir/tmp_dir") |
| 68 | if os.path.exists(tmp_issue_dir): |
| 69 | PathMgr().rmpath(tmp_issue_dir) |
| 70 | |
| 71 | with open(task_response_file, 'r') as fp: |
| 72 | task_response = json.load(fp) |
| 73 | rt_code = task_response['status'] |
| 74 | if rt_code == 0: |
| 75 | logger.info("%s task succeed." % task_name) |
| 76 | else: |
| 77 | logger.info("%s task fail, please check log file!" % task_name) |
| 78 | return rt_code |
nothing calls this directly
no test coverage detected