根据项目配置信息生成任务参数列表 :param proj_conf: 项目配置信息 :return: 当前可直接执行的任务参数列表
(self, proj_conf, path_filters)
| 218 | } |
| 219 | |
| 220 | def _generate_request(self, proj_conf, path_filters): |
| 221 | """ |
| 222 | 根据项目配置信息生成任务参数列表 |
| 223 | :param proj_conf: 项目配置信息 |
| 224 | :return: 当前可直接执行的任务参数列表 |
| 225 | """ |
| 226 | job_context = proj_conf["job_context"] |
| 227 | # 全量分析 |
| 228 | job_context["incr_scan"] = False |
| 229 | # 合并过滤路径 |
| 230 | new_path_filters = self._merge_path_filters(job_context["path_filters"], path_filters) |
| 231 | |
| 232 | task_list = proj_conf["tasks"] |
| 233 | |
| 234 | for task_request in task_list: |
| 235 | task_params = task_request.get("task_params") |
| 236 | task_params["incr_scan"] = False |
| 237 | task_params["scm_last_revision"] = "" |
| 238 | task_params["path_filters"] = new_path_filters |
| 239 | task_params["scm_url"] = self._scm_url |
| 240 | task_params["scm_type"] = self._scm_type |
| 241 | task_params["project_id"] = job_context.get("project_id") |
| 242 | |
| 243 | task_request["execute_processes"] = task_request["processes"] |
| 244 | task_request["private_processes"] = [] |
| 245 | |
| 246 | # 添加task_scene信息,标记任务运行场景 |
| 247 | task_request['task_params']['task_scene'] = TaskScene.LOCAL |
| 248 | # 添加 source_dir 和 scm_type |
| 249 | if self._source_dir: |
| 250 | task_request['task_params']['source_dir'] = self._source_dir |
| 251 | # 添加 task_dir 信息 |
| 252 | task_dir, task_id = TaskDirCtl().acquire_task_dir() |
| 253 | task_request['id'] = task_id |
| 254 | task_request['task_dir'] = task_dir |
| 255 | task_request['task_params']['task_id'] = task_id |
| 256 | |
| 257 | return task_list |
| 258 | |
| 259 | def _scan_project(self, execute_request_list): |
| 260 | """ |
no test coverage detected