MCPcopy
hub / github.com/corpnewt/gibMacOS / _stream_output

Method _stream_output

Scripts/run.py:29–75  ·  view source on GitHub ↗
(self, comm, shell = False)

Source from the content-addressed store, hash-verified

27 return (q,t)
28
29 def _stream_output(self, comm, shell = False):
30 output = error = ""
31 p = None
32 try:
33 if shell and type(comm) is list:
34 comm = " ".join(shlex.quote(x) for x in comm)
35 if not shell and type(comm) is str:
36 comm = shlex.split(comm)
37 p = subprocess.Popen(comm, shell=shell, stdout=subprocess.PIPE, stderr=subprocess.PIPE, bufsize=0, universal_newlines=True, close_fds=ON_POSIX)
38 # Setup the stdout thread/queue
39 q,t = self._create_thread(p.stdout)
40 qe,te = self._create_thread(p.stderr)
41 # Start both threads
42 t.start()
43 te.start()
44
45 while True:
46 c = z = ""
47 try: c = q.get_nowait()
48 except Empty: pass
49 else:
50 sys.stdout.write(c)
51 output += c
52 sys.stdout.flush()
53 try: z = qe.get_nowait()
54 except Empty: pass
55 else:
56 sys.stderr.write(z)
57 error += z
58 sys.stderr.flush()
59 if not c==z=="": continue # Keep going until empty
60 # No output - see if still running
61 p.poll()
62 if p.returncode != None:
63 # Subprocess ended
64 break
65 # No output, but subprocess still running - stall for 20ms
66 time.sleep(0.02)
67
68 o, e = p.communicate()
69 return (output+o, error+e, p.returncode)
70 except:
71 if p:
72 try: o, e = p.communicate()
73 except: o = e = ""
74 return (output+o, error+e, p.returncode)
75 return ("", "Command not found!", 1)
76
77 def _decode(self, value, encoding="utf-8", errors="ignore"):
78 # Helper method to only decode if bytes type

Callers 1

runMethod · 0.95

Calls 2

_create_threadMethod · 0.95
writeMethod · 0.80

Tested by

no test coverage detected