MCPcopy Index your code
hub / github.com/Tencent/CodeAnalysis / __init__

Method __init__

client/node/servertask/looprunner.py:37–57  ·  view source on GitHub ↗

构造函数

(self, args)

Source from the content-addressed store, hash-verified

35 """轮询任务执行器,通过轮询不断获取任务来执行
36 """
37 def __init__(self, args):
38 """构造函数
39 """
40 TaskRunner.__init__(self)
41
42 self._token = args.token
43 self._tag = args.tag
44 self._org_sid = args.org_sid
45 self._create_from = args.create_from if args.create_from else "codedog_client"
46 self._server_url = LocalConfig.get_server_url()
47 # 打印启动渠道和连接的sever地址
48 LogPrinter.info(f"start from {self._create_from}.")
49 LogPrinter.info("using server: %s" % self._server_url)
50 # 初始化与codedog服务器通信的api server实例
51 self._server = RetryDogServer(self._server_url, self._token).get_api_server()
52 self._get_task_interval = 10 # sec,获取任务频率
53
54 # 设置环境变量,标记是节点模式
55 os.environ["TaskScene"] = TaskScene.NORMAL
56 # 初始环境变量,保存下来,执行子进程时使用该环境变量,避免被污染
57 self._origin_os_env = dict(os.environ)
58
59 def _handle_exist_task(self):
60 """管理当前在执行的任务,如果任务结束,上传分析结果,并从self._running_task列表中删除

Callers

nothing calls this directly

Calls 4

RetryDogServerClass · 0.90
get_server_urlMethod · 0.80
infoMethod · 0.45
get_api_serverMethod · 0.45

Tested by

no test coverage detected