MCPcopy
hub / github.com/pex-tool/pex / sync

Method sync

pex/cli/commands/lock.py:263–376  ·  view source on GitHub ↗
(
        self,
        distributions,  # type: Iterable[Distribution]
        confirm=True,  # type: bool
        retain_pip=False,  # type: bool
    )

Source from the content-addressed store, hash-verified

261 command = attr.ib(default=None) # type: Optional[Tuple[str, ...]]
262
263 def sync(
264 self,
265 distributions, # type: Iterable[Distribution]
266 confirm=True, # type: bool
267 retain_pip=False, # type: bool
268 ):
269 # type: (...) -> Result
270
271 abs_venv_dir = os.path.realpath(self.venv.venv_dir)
272
273 existing_distributions_by_project_name = {
274 dist.metadata.project_name: dist for dist in self.venv.iter_distributions()
275 } # type: Dict[ProjectName, Distribution]
276 installed_pip = existing_distributions_by_project_name.get(
277 self._PIP_PROJECT_NAME
278 ) # type: Optional[Distribution]
279
280 resolved_pip = None # type: Optional[Distribution]
281 to_remove = [] # type: List[Distribution]
282 to_install = [] # type: List[Distribution]
283 for distribution in distributions:
284 if self._PIP_PROJECT_NAME == distribution.metadata.project_name:
285 resolved_pip = distribution
286 existing_distribution = existing_distributions_by_project_name.pop(
287 distribution.metadata.project_name, None
288 )
289 if not existing_distribution:
290 to_install.append(distribution)
291 elif existing_distribution.metadata.version != distribution.metadata.version:
292 to_remove.append(existing_distribution)
293 to_install.append(distribution)
294 if retain_pip:
295 existing_distributions_by_project_name.pop(self._PIP_PROJECT_NAME, None)
296 to_remove.extend(existing_distributions_by_project_name.values())
297
298 to_unlink_by_pin = (
299 OrderedDict()
300 ) # type: OrderedDict[Tuple[ProjectName, Version], List[Text]]
301 for distribution in to_remove:
302 to_unlink = [] # type: List[Text]
303 if distribution.metadata.type is MetadataType.DIST_INFO:
304 to_unlink.extend(
305 os.path.realpath(os.path.join(distribution.location, installed_file.path))
306 for installed_file in Record.read(
307 lines=distribution.iter_metadata_lines("RECORD")
308 )
309 if isinstance(installed_file, InstalledFile)
310 )
311 elif distribution.metadata.type is MetadataType.EGG_INFO:
312 installed_files = distribution.metadata.files.metadata_file_rel_path(
313 "installed-files.txt"
314 )
315 if installed_files:
316 base_dir = os.path.realpath(
317 os.path.join(distribution.location, os.path.dirname(installed_files))
318 )
319 to_unlink.extend(
320 os.path.join(base_dir, path)

Callers 2

syncMethod · 0.45
_syncMethod · 0.45

Calls 15

safe_commonpathFunction · 0.90
ErrorClass · 0.90
safe_deleteFunction · 0.90
reinstall_venvFunction · 0.90
safe_execvFunction · 0.90
OkClass · 0.90
appendMethod · 0.80
extendMethod · 0.80
iter_metadata_linesMethod · 0.80
lowerMethod · 0.80
stripMethod · 0.80

Tested by

no test coverage detected