Send a new function task ``fn(*args, **kwargs)`` to the subprocess. Parameters ---------- fn : function The function to be invoked. args : list Positional argument. kwargs : dict Keyword arguments timeout : float
(self, fn, args=(), kwargs=None, timeout=None)
| 213 | return False |
| 214 | |
| 215 | def send(self, fn, args=(), kwargs=None, timeout=None): |
| 216 | """Send a new function task ``fn(*args, **kwargs)`` to the subprocess. |
| 217 | |
| 218 | Parameters |
| 219 | ---------- |
| 220 | fn : function |
| 221 | The function to be invoked. |
| 222 | |
| 223 | args : list |
| 224 | Positional argument. |
| 225 | |
| 226 | kwargs : dict |
| 227 | Keyword arguments |
| 228 | |
| 229 | timeout : float |
| 230 | Timeout value when executing the function |
| 231 | |
| 232 | Note |
| 233 | ---- |
| 234 | The caller must call recv before calling the next send in |
| 235 | order to make sure the timeout and child process exit |
| 236 | won't affect the later requests. |
| 237 | """ |
| 238 | # use cloud pickle |
| 239 | # pylint: disable=import-outside-toplevel |
| 240 | import cloudpickle |
| 241 | |
| 242 | if self._proc is not None and self._maximum_uses and self._remaining_uses == 0: |
| 243 | # Time to recycle the process. |
| 244 | self.kill() |
| 245 | |
| 246 | if self._proc is None: |
| 247 | self._start() |
| 248 | # init |
| 249 | if self._initializer is not None: |
| 250 | self.send(self._initializer, self._initargs) |
| 251 | self.recv() |
| 252 | |
| 253 | # N.B. The initializer doesn't count as a "use" |
| 254 | self._remaining_uses = self._maximum_uses |
| 255 | kwargs = {} if not kwargs else kwargs |
| 256 | data = cloudpickle.dumps((fn, args, kwargs, timeout), protocol=pickle.HIGHEST_PROTOCOL) |
| 257 | try: |
| 258 | self._writer.write(struct.pack("<i", len(data))) |
| 259 | self._writer.write(data) |
| 260 | self._writer.flush() |
| 261 | except OSError: |
| 262 | pass |
| 263 | |
| 264 | if self._remaining_uses: |
| 265 | self._remaining_uses -= 1 |
| 266 | |
| 267 | def _child_process_error(self): |
| 268 | """Raise a child process error.""" |