MCPcopy Index your code
hub / github.com/cloudpipe/cloudpickle / _Worker

Class _Worker

tests/testutils.py:138–169  ·  view source on GitHub ↗

Source from the content-addressed store, hash-verified

136
137
138class _Worker:
139 def __init__(self, protocol=None):
140 self.protocol = protocol
141 self.pool = ProcessPoolExecutor(max_workers=1)
142 self.pool.submit(id, 42).result() # start the worker process
143
144 def run(self, func, *args, **kwargs):
145 """Synchronous remote function call"""
146
147 input_payload = dumps((func, args, kwargs), protocol=self.protocol)
148 result_payload = self.pool.submit(
149 call_func, input_payload, self.protocol
150 ).result()
151 result = loads(result_payload)
152
153 if isinstance(result, BaseException):
154 raise result
155 return result
156
157 def memsize(self):
158 workers_pids = [
159 p.pid if hasattr(p, "pid") else p for p in list(self.pool._processes)
160 ]
161 num_workers = len(workers_pids)
162 if num_workers == 0:
163 return 0
164 elif num_workers > 1:
165 raise RuntimeError("Unexpected number of workers: %d" % num_workers)
166 return psutil.Process(workers_pids[0]).memory_info().rss
167
168 def close(self):
169 self.pool.shutdown(wait=True)
170
171
172@contextmanager

Callers 1

subprocess_workerFunction · 0.85

Calls

no outgoing calls

Tested by 1

subprocess_workerFunction · 0.68

Used in the wild real call sites across dependent graphs

searching dependent graphs…