(command, verbose=False, cwd=None, encoding=sys.stdout.encoding)
| 40 | |
| 41 | |
| 42 | def run_cmd_subprocess(command, verbose=False, cwd=None, encoding=sys.stdout.encoding): |
| 43 | if verbose: |
| 44 | print("Using run_cmd_subprocess:", command) |
| 45 | |
| 46 | try: |
| 47 | shell = os.environ.get("SHELL", "/bin/sh") |
| 48 | parent_process = None |
| 49 | |
| 50 | # Determine the appropriate shell |
| 51 | if platform.system() == "Windows": |
| 52 | parent_process = get_windows_parent_process_name() |
| 53 | if parent_process == "powershell.exe": |
| 54 | command = f"powershell -Command {command}" |
| 55 | |
| 56 | if verbose: |
| 57 | print("Running command:", command) |
| 58 | print("SHELL:", shell) |
| 59 | if platform.system() == "Windows": |
| 60 | print("Parent process:", parent_process) |
| 61 | |
| 62 | process = subprocess.Popen( |
| 63 | command, |
| 64 | stdout=subprocess.PIPE, |
| 65 | stderr=subprocess.STDOUT, |
| 66 | text=True, |
| 67 | shell=True, |
| 68 | encoding=encoding, |
| 69 | errors="replace", |
| 70 | bufsize=0, # Set bufsize to 0 for unbuffered output |
| 71 | universal_newlines=True, |
| 72 | cwd=cwd, |
| 73 | ) |
| 74 | |
| 75 | output = [] |
| 76 | while True: |
| 77 | chunk = process.stdout.read(1) |
| 78 | if not chunk: |
| 79 | break |
| 80 | print(chunk, end="", flush=True) # Print the chunk in real-time |
| 81 | output.append(chunk) # Store the chunk for later use |
| 82 | |
| 83 | process.wait() |
| 84 | return process.returncode, "".join(output) |
| 85 | except Exception as e: |
| 86 | return 1, str(e) |
| 87 | |
| 88 | |
| 89 | def run_cmd_pexpect(command, verbose=False, cwd=None): |
no test coverage detected