MCPcopy Index your code
hub / github.com/Tencent/CodeAnalysis / TaskDirCtl

Class TaskDirCtl

client/node/common/taskdirmgr.py:20–52  ·  view source on GitHub ↗

本地task目录管理类

Source from the content-addressed store, hash-verified

18
19
20class TaskDirCtl(object):
21 """本地task目录管理类
22 """
23 def __init__(self):
24 self._task_dirs_root = settings.TASK_DIR
25
26 def acquire_task_dir(self, task_id=None, dirname_prefix="task_"):
27 """申请任务目录
28
29 :param task_id: 任务id
30 :param dirname_prefix: taskdir名称前缀
31 :return:任务目录路径
32 """
33 if task_id:
34 task_dir = os.path.join(self._task_dirs_root, f"{dirname_prefix}{task_id}")
35 if os.path.exists(task_dir):
36 suffix_no = 1
37 while True:
38 if not os.path.exists(os.path.join(self._task_dirs_root, f"{dirname_prefix}{task_id}_{suffix_no}")):
39 break
40 suffix_no += 1
41 task_dir = os.path.join(self._task_dirs_root, f"{dirname_prefix}{task_id}_{suffix_no}")
42 else: # 没有传task_id,根据本地task dir名称排序,创建一个新的
43 task_id = 1
44 while True:
45 if not os.path.exists(os.path.join(self._task_dirs_root, f"{dirname_prefix}{task_id}")):
46 break
47 task_id += 1
48 task_dir = os.path.join(self._task_dirs_root, f"{dirname_prefix}{task_id}")
49
50 # 此时task_dir尚未创建,需要新建目录
51 os.makedirs(task_dir)
52 return task_dir, task_id

Callers 5

runMethod · 0.90
runMethod · 0.90
_generate_requestMethod · 0.90

Calls

no outgoing calls

Tested by 1

runMethod · 0.72