MCPcopy
hub / github.com/borgbackup/borg / _get_locks

Method _get_locks

src/borg/storelocking.py:136–153  ·  view source on GitHub ↗
(self)

Source from the content-addressed store, hash-verified

134 return False
135
136 def _get_locks(self):
137 locks = {}
138 try:
139 infos = list(self.store.list("locks"))
140 except ObjectNotFound:
141 return {}
142 for info in infos:
143 key = info.name
144 content = self.store.load(f"locks/{key}")
145 lock = json.loads(content.decode("utf-8"))
146 lock["key"] = key
147 lock["dt"] = datetime.datetime.fromisoformat(lock["time"])
148 if self._is_stale_lock(lock):
149 # ignore it and delete it (even if it is not from us)
150 self._delete_lock(key, ignore_not_found=True, update_last_refresh=self._is_our_lock(lock))
151 else:
152 locks[key] = lock
153 return locks
154
155 def _find_locks(self, *, only_exclusive=False, only_mine=False):
156 locks = self._get_locks()

Callers 3

_find_locksMethod · 0.95
break_lockMethod · 0.95

Calls 5

_is_stale_lockMethod · 0.95
_delete_lockMethod · 0.95
_is_our_lockMethod · 0.95
listMethod · 0.45
loadMethod · 0.45

Tested by 1