轮询任务执行器,通过轮询不断获取任务来执行
| 32 | |
| 33 | |
| 34 | class LoopRunner(TaskRunner): |
| 35 | """轮询任务执行器,通过轮询不断获取任务来执行 |
| 36 | """ |
| 37 | def __init__(self, args): |
| 38 | """构造函数 |
| 39 | """ |
| 40 | TaskRunner.__init__(self) |
| 41 | |
| 42 | self._token = args.token |
| 43 | self._tag = args.tag |
| 44 | self._org_sid = args.org_sid |
| 45 | self._create_from = args.create_from if args.create_from else "codedog_client" |
| 46 | self._server_url = LocalConfig.get_server_url() |
| 47 | # 打印启动渠道和连接的sever地址 |
| 48 | LogPrinter.info(f"start from {self._create_from}.") |
| 49 | LogPrinter.info("using server: %s" % self._server_url) |
| 50 | # 初始化与codedog服务器通信的api server实例 |
| 51 | self._server = RetryDogServer(self._server_url, self._token).get_api_server() |
| 52 | self._get_task_interval = 10 # sec,获取任务频率 |
| 53 | |
| 54 | # 设置环境变量,标记是节点模式 |
| 55 | os.environ["TaskScene"] = TaskScene.NORMAL |
| 56 | # 初始环境变量,保存下来,执行子进程时使用该环境变量,避免被污染 |
| 57 | self._origin_os_env = dict(os.environ) |
| 58 | |
| 59 | def _handle_exist_task(self): |
| 60 | """管理当前在执行的任务,如果任务结束,上传分析结果,并从self._running_task列表中删除 |
| 61 | """ |
| 62 | for task in self._running_task[:]: |
| 63 | if task.done: |
| 64 | LogPrinter.info('task %s with id %d is done', task.task_name, task.task_id) |
| 65 | # 从任务队列中删除 |
| 66 | self._running_task.remove(task) |
| 67 | # 上传结果到server |
| 68 | self._send_result(task) |
| 69 | # # 分析任务完成后,按照磁盘空间和创建时间清理数据 |
| 70 | # LogPrinter.info("clean data directory ...") |
| 71 | # SourceManager.del_old_file() |
| 72 | |
| 73 | def _terminate_task(self, task_id): |
| 74 | """kill task""" |
| 75 | for task in self._running_task[:]: |
| 76 | if task and task.task_id == task_id: |
| 77 | task.terminate() |
| 78 | self._running_task.remove(task) |
| 79 | LogPrinter.info('terminate task %d by server', task_id) |
| 80 | |
| 81 | def _send_result(self, task): |
| 82 | """ |
| 83 | 上传结果 |
| 84 | :param task: 任务实例 |
| 85 | :return: |
| 86 | """ |
| 87 | """send task result to server""" |
| 88 | if task.code is None: # 分析正常完成的情况 |
| 89 | with open(task.response_file, 'r') as fp: |
| 90 | task_response = json.load(fp) |
| 91 | code = task_response['status'] |