MCPcopy Index your code
hub / github.com/Tencent/CodeAnalysis / run

Method run

client/node/testrunner.py:39–78  ·  view source on GitHub ↗

执行单个本地测试任务

(self)

Source from the content-addressed store, hash-verified

37 self._origin_os_env = dict(os.environ)
38
39 def run(self):
40 """执行单个本地测试任务
41 """
42 task_dir, task_id = TaskDirCtl().acquire_task_dir()
43 logger.debug('created task_dir: %s', task_dir)
44 task_log = os.path.join(task_dir, 'task.log')
45 task_request_file = os.path.join(task_dir, 'task_request.json')
46 task_response_file = os.path.join(task_dir, 'task_response.json')
47 with open(self._request_file, 'r') as fp:
48 request = json.load(fp)
49 request['task_id'] = task_id
50 request['task_dir'] = task_dir
51 request['task_params']['task_scene'] = TaskScene.TEST
52 with open(task_request_file, 'w') as fp:
53 json.dump(request, fp, indent=2)
54
55 # 启动任务
56 task_name = request['task_name']
57 task = Task(task_id, task_name, task_request_file, task_response_file, task_log, env=self._origin_os_env)
58 task.start()
59 self._running_task.append(task)
60
61 # 等待任务执行完成
62 while self._running_task:
63 time.sleep(3)
64 self._handle_exist_task()
65
66 # 删除临时文件夹
67 tmp_issue_dir = os.path.join(task_dir, "workdir/tmp_dir")
68 if os.path.exists(tmp_issue_dir):
69 PathMgr().rmpath(tmp_issue_dir)
70
71 with open(task_response_file, 'r') as fp:
72 task_response = json.load(fp)
73 rt_code = task_response['status']
74 if rt_code == 0:
75 logger.info("%s task succeed." % task_name)
76 else:
77 logger.info("%s task fail, please check log file!" % task_name)
78 return rt_code

Callers

nothing calls this directly

Calls 9

startMethod · 0.95
TaskDirCtlClass · 0.90
TaskClass · 0.90
acquire_task_dirMethod · 0.80
debugMethod · 0.80
dumpMethod · 0.80
loadMethod · 0.45
_handle_exist_taskMethod · 0.45
infoMethod · 0.45

Tested by

no test coverage detected