MCPcopy Index your code
hub / github.com/borgbackup/borg / SecurityManager

Class SecurityManager

src/borg/cache.py:75–230  ·  view source on GitHub ↗

Tracks repositories. Ensures that nothing bad happens (repository swaps, replay attacks, unknown repositories, etc.). This is complicated by the cache being initially used for this, while only some commands actually use the cache, which meant that other commands did not perform

Source from the content-addressed store, hash-verified

73
74
75class SecurityManager:
76 """
77 Tracks repositories. Ensures that nothing bad happens (repository swaps,
78 replay attacks, unknown repositories, etc.).
79
80 This is complicated by the cache being initially used for this, while
81 only some commands actually use the cache, which meant that other commands
82 did not perform these checks.
83
84 Further complications were created by the cache being a cache, so it
85 could be legitimately deleted, which is annoying because Borg did not
86 recognize repositories after that.
87
88 Therefore, a second location, the security database (see get_security_dir),
89 was introduced, which stores this information. However, this means that
90 the code has to deal with a cache existing but no security database entry,
91 or inconsistencies between the security database and the cache which have to
92 be reconciled, and also with no cache existing but a security database entry.
93 """
94
95 def __init__(self, repository):
96 self.repository = repository
97 self.dir = Path(get_security_dir(repository.id_str, legacy=(repository.version == 1)))
98 self.key_type_file = self.dir / "key-type"
99 self.location_file = self.dir / "location"
100 self.manifest_ts_file = self.dir / "manifest-timestamp"
101
102 @staticmethod
103 def destroy(repository, path=None):
104 """Destroys the security directory for ``repository`` or at ``path``."""
105 path = path or get_security_dir(repository.id_str, legacy=(repository.version == 1))
106 if Path(path).exists():
107 shutil.rmtree(path)
108
109 def known(self):
110 return all(f.exists() for f in (self.key_type_file, self.location_file, self.manifest_ts_file))
111
112 def key_matches(self, key):
113 if not self.known():
114 return False
115 try:
116 with self.key_type_file.open() as fd:
117 type = fd.read()
118 return type == str(key.TYPE)
119 except OSError as exc:
120 logger.warning("Could not read/parse key type file: %s", exc)
121
122 def save(self, manifest, key):
123 logger.debug("security: saving state for %s to %s", self.repository.id_str, str(self.dir))
124 current_location = self.repository._location.canonical_path()
125 logger.debug("security: current location %s", current_location)
126 logger.debug("security: key type %s", str(key.TYPE))
127 logger.debug("security: manifest timestamp %s", manifest.timestamp)
128 with SaveFile(self.location_file) as fd:
129 fd.write(current_location)
130 with SaveFile(self.key_type_file) as fd:
131 fd.write(str(key.TYPE))
132 with SaveFile(self.manifest_ts_file) as fd:

Callers 2

assert_secureFunction · 0.85
__init__Method · 0.85

Calls

no outgoing calls

Tested by

no test coverage detected