Install or update a plugin Args: plugin (dict): Plugin info dict plugins_dir (Path): Plugins directory is_update (bool, optional): Whether this is an update operation. Defaults to False proxy (str, optional): Proxy server address
(
plugin: dict,
plugins_dir: Path,
is_update: bool = False,
proxy: str | None = None,
)
| 287 | |
| 288 | |
| 289 | def manage_plugin( |
| 290 | plugin: dict, |
| 291 | plugins_dir: Path, |
| 292 | is_update: bool = False, |
| 293 | proxy: str | None = None, |
| 294 | ) -> None: |
| 295 | """Install or update a plugin |
| 296 | |
| 297 | Args: |
| 298 | plugin (dict): Plugin info dict |
| 299 | plugins_dir (Path): Plugins directory |
| 300 | is_update (bool, optional): Whether this is an update operation. Defaults to False |
| 301 | proxy (str, optional): Proxy server address |
| 302 | |
| 303 | """ |
| 304 | plugin_name = plugin["name"] |
| 305 | repo_url = plugin["repo"] |
| 306 | |
| 307 | # If updating and local path exists, use it directly |
| 308 | if is_update and plugin.get("local_path"): |
| 309 | target_path = Path(plugin["local_path"]) |
| 310 | else: |
| 311 | target_path = plugins_dir / plugin_name |
| 312 | |
| 313 | backup_path = Path(f"{target_path}_backup") if is_update else None |
| 314 | |
| 315 | # Check if plugin exists |
| 316 | if is_update and not target_path.exists(): |
| 317 | raise click.ClickException( |
| 318 | f"Plugin {plugin_name} is not installed and cannot be updated" |
| 319 | ) |
| 320 | |
| 321 | # Backup existing plugin |
| 322 | if is_update and backup_path is not None and backup_path.exists(): |
| 323 | shutil.rmtree(backup_path) |
| 324 | if is_update and backup_path is not None: |
| 325 | shutil.copytree(target_path, backup_path) |
| 326 | |
| 327 | try: |
| 328 | click.echo( |
| 329 | f"{'Updating' if is_update else 'Downloading'} plugin {plugin_name} from {repo_url}...", |
| 330 | ) |
| 331 | get_git_repo(repo_url, target_path, proxy) |
| 332 | |
| 333 | # Update succeeded, delete backup |
| 334 | if is_update and backup_path is not None and backup_path.exists(): |
| 335 | shutil.rmtree(backup_path) |
| 336 | click.echo( |
| 337 | f"Plugin {plugin_name} {'updated' if is_update else 'installed'} successfully" |
| 338 | ) |
| 339 | except Exception as e: |
| 340 | if target_path.exists(): |
| 341 | shutil.rmtree(target_path, ignore_errors=True) |
| 342 | if is_update and backup_path is not None and backup_path.exists(): |
| 343 | shutil.move(backup_path, target_path) |
| 344 | raise click.ClickException( |
| 345 | f"Error {'updating' if is_update else 'installing'} plugin {plugin_name}: {e}", |
| 346 | ) |
no test coverage detected