Remove and return the data from the `FutureObject`. If optional args block is true and timeout is None (the default), block if necessary until an item is available. If timeout is a positive number, it blocks at most timeout seconds and raises the `parl.remote.exce
(self, block=True, timeout=None)
| 55 | self.internal_lock = threading.Lock() |
| 56 | |
| 57 | def get(self, block=True, timeout=None): |
| 58 | """ |
| 59 | Remove and return the data from the `FutureObject`. |
| 60 | If optional args block is true and timeout is None (the default), block if |
| 61 | necessary until an item is available. If timeout is a positive number, it |
| 62 | blocks at most timeout seconds and raises the `parl.remote.exceptions.FutureObjectEmpty` |
| 63 | exception if no itemwas available within that time. Otherwise (block is false), return an item |
| 64 | if one is immediately available, else raise the `parl.remote.exceptions.FutureObjectEmpty` |
| 65 | exception (timeout is ignored in that case). |
| 66 | """ |
| 67 | if timeout is not None: |
| 68 | assert timeout > 0, "`timeout` must be a non-negative number" |
| 69 | |
| 70 | with self.internal_lock: |
| 71 | if self._already_get: |
| 72 | raise FutureGetRepeatedlyError() |
| 73 | |
| 74 | try: |
| 75 | result = self._output_queue.get(block=block, timeout=timeout) |
| 76 | except queue.Empty: |
| 77 | raise FutureObjectEmpty |
| 78 | |
| 79 | if isinstance(result, FutureFunctionError): |
| 80 | time.sleep( |
| 81 | 0.1 |
| 82 | ) # waiting for another thread printing the error message |
| 83 | raise result |
| 84 | self._already_get = True |
| 85 | return result |
| 86 | |
| 87 | def get_nowait(self): |
| 88 | """Equivalent to get(block=False). |