MCPcopy
hub / github.com/Tencent/CodeAnalysis / create_local_scan

Function create_local_scan

server/projects/main/apps/codeproj/core/base.py:25–83  ·  view source on GitHub ↗

启动扫描 1. 创建任务和Scan,然后返回job_id,scan_id 2. 异步获取并更新任务参数 3. 启动任务 :param project: Project, 项目 :param creator: str,创建人 :param scan_data: dict,扫描数据 { "force_create": True/False,表示是否强制启动 "incr_scan": True/False,表示是否增量扫描 } :param created_fr

(project, creator, scan_data, created_from="codedog_client", **kwargs)

Source from the content-addressed store, hash-verified

23
24
25def create_local_scan(project, creator, scan_data, created_from="codedog_client", **kwargs):
26 """启动扫描
27
28 1. 创建任务和Scan,然后返回job_id,scan_id
29 2. 异步获取并更新任务参数
30 3. 启动任务
31
32 :param project: Project, 项目
33 :param creator: str,创建人
34 :param scan_data: dict,扫描数据
35 {
36 "force_create": True/False,表示是否强制启动
37 "incr_scan": True/False,表示是否增量扫描
38 }
39 :param created_from: web, 创建来源
40 :param kwargs:
41 - client_flag: boolean, 客户端启动标识
42 :return: int,int 任务编号和扫描编号
43 """
44 tag = scan_data.pop("tag", None)
45 labels = scan_data.pop("labels", None)
46 task_names = scan_data.pop("task_names", None)
47 job_context = {
48 "force_create": scan_data.get("force_create", False),
49 "incr_scan": scan_data.get("incr_scan", True) is True,
50 "created_from": created_from,
51 "tag": str(tag) if tag else None,
52 "scm_revision": scan_data.get("revision", None),
53 "scm_last_revision": scan_data.get("last_revision", None),
54 "labels": [label.name for label in labels] if labels else None,
55 }
56 job_context.update(**scan_data)
57 job_manager = JobManager(project)
58 logger.info("[Project: %s] 开始初始化任务,任务参数: %s" % (project.id, json.dumps(job_context)))
59 scan_type = job_manager.get_scan_type(scan_data, job_context)
60 job = job_manager.initialize_job(
61 force_create=job_context["force_create"],
62 creator=creator, created_from=created_from,
63 client_flag=True
64 )
65 logger.info("[Project: %s] 初始化任务完成,任务编号: %s" % (project.id, job.id))
66 try:
67 job.context = job_context
68 job.save()
69 job_manager.create_waiting_scan_on_analysis_server(job, scan_type)
70 task_infos = None
71 if task_names:
72 task_infos = job_manager.init_tasks(job, task_names)
73 logger.info("[Project: %s] 创建任务完成: %s,关联扫描编号:%s" % (project.id, job.id, job.scan_id))
74 return job.id, job.scan_id, task_infos
75 except Exception as err:
76 if isinstance(err, CDErrorBase):
77 result_msg = err.msg
78 result_code = err.code
79 else:
80 result_msg = "创建扫描异常: %s" % str(err)
81 result_code = errcode.E_SERVER_JOB_CREATE_ERROR
82 JobCloseHandler.revoke_job(job, result_code=result_code, result_msg=result_msg)

Callers

nothing calls this directly

Calls 10

get_scan_typeMethod · 0.95
initialize_jobMethod · 0.95
init_tasksMethod · 0.95
JobManagerClass · 0.90
revoke_jobMethod · 0.80
getMethod · 0.45
updateMethod · 0.45
infoMethod · 0.45
saveMethod · 0.45

Tested by

no test coverage detected