(
self,
distributions, # type: Iterable[Distribution]
confirm=True, # type: bool
retain_pip=False, # type: bool
)
| 261 | command = attr.ib(default=None) # type: Optional[Tuple[str, ...]] |
| 262 | |
| 263 | def sync( |
| 264 | self, |
| 265 | distributions, # type: Iterable[Distribution] |
| 266 | confirm=True, # type: bool |
| 267 | retain_pip=False, # type: bool |
| 268 | ): |
| 269 | # type: (...) -> Result |
| 270 | |
| 271 | abs_venv_dir = os.path.realpath(self.venv.venv_dir) |
| 272 | |
| 273 | existing_distributions_by_project_name = { |
| 274 | dist.metadata.project_name: dist for dist in self.venv.iter_distributions() |
| 275 | } # type: Dict[ProjectName, Distribution] |
| 276 | installed_pip = existing_distributions_by_project_name.get( |
| 277 | self._PIP_PROJECT_NAME |
| 278 | ) # type: Optional[Distribution] |
| 279 | |
| 280 | resolved_pip = None # type: Optional[Distribution] |
| 281 | to_remove = [] # type: List[Distribution] |
| 282 | to_install = [] # type: List[Distribution] |
| 283 | for distribution in distributions: |
| 284 | if self._PIP_PROJECT_NAME == distribution.metadata.project_name: |
| 285 | resolved_pip = distribution |
| 286 | existing_distribution = existing_distributions_by_project_name.pop( |
| 287 | distribution.metadata.project_name, None |
| 288 | ) |
| 289 | if not existing_distribution: |
| 290 | to_install.append(distribution) |
| 291 | elif existing_distribution.metadata.version != distribution.metadata.version: |
| 292 | to_remove.append(existing_distribution) |
| 293 | to_install.append(distribution) |
| 294 | if retain_pip: |
| 295 | existing_distributions_by_project_name.pop(self._PIP_PROJECT_NAME, None) |
| 296 | to_remove.extend(existing_distributions_by_project_name.values()) |
| 297 | |
| 298 | to_unlink_by_pin = ( |
| 299 | OrderedDict() |
| 300 | ) # type: OrderedDict[Tuple[ProjectName, Version], List[Text]] |
| 301 | for distribution in to_remove: |
| 302 | to_unlink = [] # type: List[Text] |
| 303 | if distribution.metadata.type is MetadataType.DIST_INFO: |
| 304 | to_unlink.extend( |
| 305 | os.path.realpath(os.path.join(distribution.location, installed_file.path)) |
| 306 | for installed_file in Record.read( |
| 307 | lines=distribution.iter_metadata_lines("RECORD") |
| 308 | ) |
| 309 | if isinstance(installed_file, InstalledFile) |
| 310 | ) |
| 311 | elif distribution.metadata.type is MetadataType.EGG_INFO: |
| 312 | installed_files = distribution.metadata.files.metadata_file_rel_path( |
| 313 | "installed-files.txt" |
| 314 | ) |
| 315 | if installed_files: |
| 316 | base_dir = os.path.realpath( |
| 317 | os.path.join(distribution.location, os.path.dirname(installed_files)) |
| 318 | ) |
| 319 | to_unlink.extend( |
| 320 | os.path.join(base_dir, path) |
no test coverage detected