MCPcopy Index your code
hub / github.com/bugy/script-server / POpenProcessWrapper

Class POpenProcessWrapper

src/execution/process_popen.py:25–104  ·  view source on GitHub ↗

Source from the content-addressed store, hash-verified

23
24
25class POpenProcessWrapper(process_base.ProcessWrapper):
26 def __init__(self, command, working_directory, all_env_variables):
27 super().__init__(command, working_directory, all_env_variables)
28
29 def start_execution(self, command, working_directory):
30 shell = False
31
32 if os_utils.is_win():
33 (command, shell) = prepare_cmd_for_win(command)
34
35 env_variables = self.prepare_env_variables()
36
37 self.process = subprocess.Popen(command,
38 cwd=working_directory,
39 stdin=subprocess.PIPE,
40 stdout=subprocess.PIPE,
41 stderr=subprocess.STDOUT,
42 start_new_session=True,
43 universal_newlines=True,
44 shell=shell,
45 env=env_variables,
46 errors='replace')
47
48 def write_to_input(self, value):
49 if self.is_finished():
50 return
51
52 input_value = value
53 if not value.endswith("\n"):
54 input_value += "\n"
55
56 self._write_script_output(input_value)
57
58 self.process.stdin.write(input_value)
59 self.process.stdin.flush()
60
61 def wait_finish(self):
62 self.process.wait()
63
64 def pipe_process_output(self):
65 try:
66 while True:
67 finished = False
68 wait_new_output = False
69
70 if self.is_finished():
71 data = self.process.stdout.read()
72
73 finished = True
74
75 else:
76 data = self.process.stdout.read(1)
77
78 if not data:
79 wait_new_output = True
80
81 if data:
82 output_text = data

Callers 2

testMethod · 0.90

Calls

no outgoing calls

Tested by 2

testMethod · 0.72