Run a driver as a separate process. Args: driver_script: A string to run as a Python script. env: The environment variables for the driver. encode: The encoding to use for the driver script. Returns: The script's output.
(
driver_script: str, env: Dict = None, encode: str = "utf-8"
)
| 372 | |
| 373 | |
| 374 | def run_string_as_driver( |
| 375 | driver_script: str, env: Dict = None, encode: str = "utf-8" |
| 376 | ) -> str: |
| 377 | """Run a driver as a separate process. |
| 378 | |
| 379 | Args: |
| 380 | driver_script: A string to run as a Python script. |
| 381 | env: The environment variables for the driver. |
| 382 | encode: The encoding to use for the driver script. |
| 383 | |
| 384 | Returns: |
| 385 | The script's output. |
| 386 | """ |
| 387 | proc = subprocess.Popen( |
| 388 | [sys.executable, "-"], |
| 389 | stdin=subprocess.PIPE, |
| 390 | stdout=subprocess.PIPE, |
| 391 | stderr=subprocess.STDOUT, |
| 392 | env=env, |
| 393 | ) |
| 394 | with proc: |
| 395 | output = proc.communicate(driver_script.encode(encoding=encode))[0] |
| 396 | if proc.returncode: |
| 397 | print(decode(output, encode_type=encode)) |
| 398 | logger.error(proc.stderr) |
| 399 | raise subprocess.CalledProcessError( |
| 400 | proc.returncode, proc.args, output, proc.stderr |
| 401 | ) |
| 402 | out = decode(output, encode_type=encode) |
| 403 | return out |
| 404 | |
| 405 | |
| 406 | @dataclass |
no test coverage detected
searching dependent graphs…