MCPcopy
hub / github.com/uber/petastorm / DummyPool

Class DummyPool

petastorm/workers_pool/dummy_pool.py:20–91  ·  view source on GitHub ↗

This class has pool interface but performs all work in calls to get_results. It is sometimes convenient to substitute a real pool with this dummy implementation. Found this class useful when profiling worker code. When on a separate thread, the worker code was not observable (out of the

Source from the content-addressed store, hash-verified

18
19
20class DummyPool(object):
21 """This class has pool interface but performs all work in calls to get_results. It is sometimes convenient
22 to substitute a real pool with this dummy implementation.
23
24 Found this class useful when profiling worker code. When on a separate thread, the worker code was not observable
25 (out of the box) by the profiler"""
26
27 # Have workers argument just to make compatible with other pool implementations
28 def __init__(self, workers=None):
29 # We just accumulate all ventilated items in the list
30 self._ventilator_queue = []
31
32 # get_results will populate this list
33 self._results_queue = []
34 self._worker = None
35 self._ventilator = None
36 self.workers_count = 1
37
38 def start(self, worker_class, worker_args=None, ventilator=None):
39 # Instantiate a single worker with all the args
40 self._worker = worker_class(0, self._results_queue.append, worker_args)
41
42 if ventilator:
43 self._ventilator = ventilator
44 self._ventilator.start()
45
46 def ventilate(self, *args, **kargs):
47 """Send a work item to a worker process."""
48 self._ventilator_queue.append((args, kargs))
49
50 def get_results(self):
51 """Returns results
52
53 The processing is done on the get_results caller thread if the results queue is empty
54
55 :return: arguments passed to publish_func(...) by a worker
56 """
57
58 if self._results_queue:
59 # We have already calculated result. Just return it
60 return self._results_queue.pop(0)
61 else:
62 # If we don't have any tasks waiting for processing, then indicate empty queue
63 while self._ventilator_queue or (self._ventilator and not self._ventilator.completed()):
64
65 # To prevent a race condition of the ventilator working but not yet placing an item
66 # on the ventilator queue. We block until something is on the ventilator queue.
67 while not self._ventilator_queue:
68 sleep(.1)
69
70 # If we do have some tasks, then process a task from the head of a queue
71 args, kargs = self._ventilator_queue.pop(0)
72 self._worker.process(*args, **kargs)
73
74 if self._ventilator:
75 self._ventilator.processed_item()
76
77 if self._results_queue:

Calls

no outgoing calls

Used in the wild real call sites across dependent graphs

searching dependent graphs…