(
script: str,
args: Optional[List[str]] = None,
*,
monitor_output: bool = False,
termination_str: Optional[str] = None,
cancel_event: Optional[threading.Event] = None,
allow_continue_on_fail: bool = False,
)
| 12 | |
| 13 | |
| 14 | def run_powershell_script( |
| 15 | script: str, |
| 16 | args: Optional[List[str]] = None, |
| 17 | *, |
| 18 | monitor_output: bool = False, |
| 19 | termination_str: Optional[str] = None, |
| 20 | cancel_event: Optional[threading.Event] = None, |
| 21 | allow_continue_on_fail: bool = False, |
| 22 | ) -> int: |
| 23 | if not os.path.isabs(script): |
| 24 | if getattr(sys, 'frozen', False): |
| 25 | base_path = os.path.dirname(sys.executable) |
| 26 | else: |
| 27 | utilities_dir = os.path.dirname(os.path.abspath(__file__)) |
| 28 | base_path = os.path.dirname(utilities_dir) |
| 29 | embedded_path = os.path.join(base_path, 'debloat_raven_scripts', script) |
| 30 | if os.path.exists(embedded_path): |
| 31 | script_path = embedded_path |
| 32 | else: |
| 33 | temp_dir = os.environ.get('TEMP', tempfile.gettempdir()) |
| 34 | script_path = os.path.join(temp_dir, 'talon', script) |
| 35 | else: |
| 36 | script_path = script |
| 37 | if not os.path.exists(script_path): |
| 38 | msg = f"PowerShell script not found: {script_path}" |
| 39 | logger.error(msg) |
| 40 | raise FileNotFoundError(msg) |
| 41 | cmd = [ |
| 42 | "powershell.exe", |
| 43 | "-NoProfile", |
| 44 | "-ExecutionPolicy", "Bypass", |
| 45 | "-File", script_path |
| 46 | ] + (args or []) |
| 47 | logger.info(f"Launching PowerShell: {' '.join(cmd)}") |
| 48 | try: |
| 49 | creationflags = 0 |
| 50 | proc = subprocess.Popen( |
| 51 | cmd, |
| 52 | stdout=subprocess.PIPE, |
| 53 | stderr=subprocess.PIPE, |
| 54 | text=True, |
| 55 | encoding="utf-8", |
| 56 | errors="replace", |
| 57 | bufsize=1, |
| 58 | creationflags=creationflags, |
| 59 | ) |
| 60 | except Exception as e: |
| 61 | logger.exception(f"Failed to start PowerShell process: {e}") |
| 62 | show_error_popup( |
| 63 | t("errors.powershell_script_launch_failed", {"error": e}), |
| 64 | allow_continue=allow_continue_on_fail, |
| 65 | ) |
| 66 | raise |
| 67 | termination_detected = False |
| 68 | |
| 69 | def _stream(pipe, log_fn, label): |
| 70 | nonlocal termination_detected |
| 71 | for line in iter(pipe.readline, ""): |
no test coverage detected