MCPcopy
hub / github.com/treeverse/dvc / _check_blockers

Function _check_blockers

dvc/rwlock.py:76–131  ·  view source on GitHub ↗
(tmp_dir, lock, info, *, mode, waiters)

Source from the content-addressed store, hash-verified

74
75
76def _check_blockers(tmp_dir, lock, info, *, mode, waiters): # noqa: C901, PLR0912
77 from .lock import LockError
78
79 non_existing_pid = set()
80
81 blockers = []
82 to_release = defaultdict(list)
83 for path, infos in lock[mode].items():
84 for waiter_path in waiters:
85 if localfs.overlaps(waiter_path, path):
86 break
87 else:
88 continue
89
90 infos = infos if isinstance(infos, list) else [infos]
91 for blocker in infos:
92 if blocker == info:
93 continue
94
95 pid = int(blocker["pid"])
96
97 if pid in non_existing_pid:
98 pass
99 elif psutil.pid_exists(pid):
100 blockers.append(blocker)
101 continue
102 else:
103 non_existing_pid.add(pid)
104 cmd = blocker["cmd"]
105 logger.warning(
106 (
107 "Process '%s' with (Pid %s), in RWLock-file '%s'"
108 " had been killed. Auto removed it from the lock file."
109 ),
110 cmd,
111 pid,
112 relpath(path),
113 )
114 to_release[json.dumps(blocker, sort_keys=True)].append(path)
115
116 if to_release:
117 for info_json, path_list in to_release.items():
118 info = json.loads(info_json)
119 if mode == "read":
120 _release_read(lock, info, path_list)
121 elif mode == "write":
122 _release_write(lock, info, path_list)
123
124 if blockers:
125 raise LockError(
126 f"'{waiter_path}' is busy, it is being blocked by:\n"
127 f"{_infos_to_str(blockers)}\n"
128 "\n"
129 "If there are no processes with such PIDs, you can manually "
130 f"remove '{tmp_dir}/rwlock' and try again."
131 )
132
133

Callers 1

rwlockFunction · 0.85

Calls 9

relpathFunction · 0.85
_release_readFunction · 0.85
_release_writeFunction · 0.85
LockErrorClass · 0.85
_infos_to_strFunction · 0.85
itemsMethod · 0.80
overlapsMethod · 0.80
appendMethod · 0.80
addMethod · 0.45

Tested by

no test coverage detected