MCPcopy
hub / github.com/uber/petastorm / ThreadPool

Class ThreadPool

petastorm/workers_pool/thread_pool.py:77–263  ·  view source on GitHub ↗

Source from the content-addressed store, hash-verified

75
76
77class ThreadPool(object):
78 def __init__(self, workers_count, results_queue_size=50, profiling_enabled=False, shuffle_rows=False, seed=None):
79 """Initializes a thread pool.
80
81 TODO: consider using a standard thread pool
82 (e.g. http://elliothallmark.com/2016/12/23/requests-with-concurrent-futures-in-python-2-7/ as an implementation)
83
84 Originally implemented our own pool to match the interface of ProcessPool (could not find a process pool
85 implementation that would not use fork)
86
87 :param workers_count: Number of threads
88 :param profile: Whether to run a profiler on the threads
89 """
90 self._seed = seed
91 self._shuffle_rows = shuffle_rows
92 self._workers = []
93 self._ventilator_queues = []
94 self.workers_count = workers_count
95 self._results_queue_size = results_queue_size
96 # Worker threads will watch this event and gracefully shutdown when the event is set
97 self._stop_event = Event()
98 self._profiling_enabled = profiling_enabled
99
100 # Count of items ventilated by the pool
101 self._ventilated_items = 0
102 # Count of items ventilated by each worker
103 self._ventilated_items_by_worker = [0 for _ in range(self.workers_count)]
104 # Count of items processed by each worker
105 self._ventilated_items_processed_by_worker = [0 for _ in range(self.workers_count)]
106 self._ventilator = None
107 self._get_results_worker_id = 0
108
109 def start(self, worker_class, worker_args=None, ventilator=None):
110 """Starts worker threads.
111
112 :param worker_class: A class of the worker class. The class will be instantiated in the worker process. The
113 class must implement :class:`.WorkerBase` protocol
114 :param worker_setup_args: Argument that will be passed to ``args`` property of the instantiated
115 :class:`.WorkerBase`
116 :return: ``None``
117 """
118 # Verify stop_event and raise exception if it's already set!
119 if self._stop_event.is_set():
120 raise RuntimeError('ThreadPool({}) cannot be reused! stop_event set? {}'
121 .format(len(self._workers), self._stop_event.is_set()))
122
123 # Set up a channel for each worker to send work
124 self._ventilator_queues = [queue.Queue() for _ in range(self.workers_count)]
125 # Set up a channel for each worker to send results
126 self._results_queues = [
127 queue.Queue(max(5, self._results_queue_size // self.workers_count))
128 for _ in range(self.workers_count)
129 ]
130 self._workers = []
131 for worker_id in range(self.workers_count):
132 # Create a closure that captures the worker_id for this specific worker
133 def make_publish_func(worker_id):
134 return lambda data: self._stop_aware_put(worker_id, data)

Calls

no outgoing calls

Used in the wild real call sites across dependent graphs

searching dependent graphs…