MCPcopy Index your code
hub / github.com/Tencent/CodeAnalysis / CodeLineReporter

Class CodeLineReporter

client/util/codecount/repoter.py:24–62  ·  view source on GitHub ↗

代码行上报类

Source from the content-addressed store, hash-verified

22
23
24class CodeLineReporter(threading.Thread):
25 """代码行上报类
26 """
27 def __init__(self, task_params):
28 threading.Thread.__init__(self)
29 self._task_params = task_params
30 self._task_scene = task_params.get('task_scene', None)
31
32 def run(self):
33 """统计代码行并向server上报
34
35 :param source_dir: 代码目录
36 :return:
37 """
38 # 本地项目无需上报,直接返回
39 if self._task_scene and self._task_scene in [TaskScene.LOCAL, TaskScene.TEST]:
40 return
41 try:
42 # 从param中获取server_url
43 server_url = self._task_params.get("server_url", None)
44 # 从param中获取token并解密
45 encrypted_token = self._task_params.get("token", None)
46 token = Crypto(settings.PASSWORD_KEY).decrypt(encrypted_token)
47
48 dog_server = RetryDogServer(server_url, token).get_api_server(retry_times=0)
49
50 job_id = self._task_params['job_id']
51 # 查询是否已经上报过
52 data = dog_server.get_job_code_line(self._task_params, job_id)
53 if data and data["code_line_num"]:
54 logger.info("已经上报过代码行数,本次分析无需上报.")
55 return
56 # 统计代码行 - 整个目录统计,不按照增量和过滤路径过滤,比较耗时
57 code_line_dict = CodeLineCount(self._task_params).run()
58 # 上报代码行数据
59 logger.info("上报代码行数: %s", code_line_dict)
60 dog_server.update_job_code_line(self._task_params, job_id, code_line_dict)
61 except Exception as err:
62 logger.exception('update_job_code_line fail: %s' % str(err))

Callers 1

load_source_dirMethod · 0.90

Calls

no outgoing calls

Tested by

no test coverage detected