| 136 | |
| 137 | |
| 138 | class _Worker: |
| 139 | def __init__(self, protocol=None): |
| 140 | self.protocol = protocol |
| 141 | self.pool = ProcessPoolExecutor(max_workers=1) |
| 142 | self.pool.submit(id, 42).result() # start the worker process |
| 143 | |
| 144 | def run(self, func, *args, **kwargs): |
| 145 | """Synchronous remote function call""" |
| 146 | |
| 147 | input_payload = dumps((func, args, kwargs), protocol=self.protocol) |
| 148 | result_payload = self.pool.submit( |
| 149 | call_func, input_payload, self.protocol |
| 150 | ).result() |
| 151 | result = loads(result_payload) |
| 152 | |
| 153 | if isinstance(result, BaseException): |
| 154 | raise result |
| 155 | return result |
| 156 | |
| 157 | def memsize(self): |
| 158 | workers_pids = [ |
| 159 | p.pid if hasattr(p, "pid") else p for p in list(self.pool._processes) |
| 160 | ] |
| 161 | num_workers = len(workers_pids) |
| 162 | if num_workers == 0: |
| 163 | return 0 |
| 164 | elif num_workers > 1: |
| 165 | raise RuntimeError("Unexpected number of workers: %d" % num_workers) |
| 166 | return psutil.Process(workers_pids[0]).memory_info().rss |
| 167 | |
| 168 | def close(self): |
| 169 | self.pool.shutdown(wait=True) |
| 170 | |
| 171 | |
| 172 | @contextmanager |
no outgoing calls
searching dependent graphs…