This class has pool interface but performs all work in calls to get_results. It is sometimes convenient to substitute a real pool with this dummy implementation. Found this class useful when profiling worker code. When on a separate thread, the worker code was not observable (out of the
| 18 | |
| 19 | |
| 20 | class DummyPool(object): |
| 21 | """This class has pool interface but performs all work in calls to get_results. It is sometimes convenient |
| 22 | to substitute a real pool with this dummy implementation. |
| 23 | |
| 24 | Found this class useful when profiling worker code. When on a separate thread, the worker code was not observable |
| 25 | (out of the box) by the profiler""" |
| 26 | |
| 27 | # Have workers argument just to make compatible with other pool implementations |
| 28 | def __init__(self, workers=None): |
| 29 | # We just accumulate all ventilated items in the list |
| 30 | self._ventilator_queue = [] |
| 31 | |
| 32 | # get_results will populate this list |
| 33 | self._results_queue = [] |
| 34 | self._worker = None |
| 35 | self._ventilator = None |
| 36 | self.workers_count = 1 |
| 37 | |
| 38 | def start(self, worker_class, worker_args=None, ventilator=None): |
| 39 | # Instantiate a single worker with all the args |
| 40 | self._worker = worker_class(0, self._results_queue.append, worker_args) |
| 41 | |
| 42 | if ventilator: |
| 43 | self._ventilator = ventilator |
| 44 | self._ventilator.start() |
| 45 | |
| 46 | def ventilate(self, *args, **kargs): |
| 47 | """Send a work item to a worker process.""" |
| 48 | self._ventilator_queue.append((args, kargs)) |
| 49 | |
| 50 | def get_results(self): |
| 51 | """Returns results |
| 52 | |
| 53 | The processing is done on the get_results caller thread if the results queue is empty |
| 54 | |
| 55 | :return: arguments passed to publish_func(...) by a worker |
| 56 | """ |
| 57 | |
| 58 | if self._results_queue: |
| 59 | # We have already calculated result. Just return it |
| 60 | return self._results_queue.pop(0) |
| 61 | else: |
| 62 | # If we don't have any tasks waiting for processing, then indicate empty queue |
| 63 | while self._ventilator_queue or (self._ventilator and not self._ventilator.completed()): |
| 64 | |
| 65 | # To prevent a race condition of the ventilator working but not yet placing an item |
| 66 | # on the ventilator queue. We block until something is on the ventilator queue. |
| 67 | while not self._ventilator_queue: |
| 68 | sleep(.1) |
| 69 | |
| 70 | # If we do have some tasks, then process a task from the head of a queue |
| 71 | args, kargs = self._ventilator_queue.pop(0) |
| 72 | self._worker.process(*args, **kargs) |
| 73 | |
| 74 | if self._ventilator: |
| 75 | self._ventilator.processed_item() |
| 76 | |
| 77 | if self._results_queue: |
no outgoing calls
searching dependent graphs…