| 75 | |
| 76 | |
| 77 | class ThreadPool(object): |
| 78 | def __init__(self, workers_count, results_queue_size=50, profiling_enabled=False, shuffle_rows=False, seed=None): |
| 79 | """Initializes a thread pool. |
| 80 | |
| 81 | TODO: consider using a standard thread pool |
| 82 | (e.g. http://elliothallmark.com/2016/12/23/requests-with-concurrent-futures-in-python-2-7/ as an implementation) |
| 83 | |
| 84 | Originally implemented our own pool to match the interface of ProcessPool (could not find a process pool |
| 85 | implementation that would not use fork) |
| 86 | |
| 87 | :param workers_count: Number of threads |
| 88 | :param profile: Whether to run a profiler on the threads |
| 89 | """ |
| 90 | self._seed = seed |
| 91 | self._shuffle_rows = shuffle_rows |
| 92 | self._workers = [] |
| 93 | self._ventilator_queues = [] |
| 94 | self.workers_count = workers_count |
| 95 | self._results_queue_size = results_queue_size |
| 96 | # Worker threads will watch this event and gracefully shutdown when the event is set |
| 97 | self._stop_event = Event() |
| 98 | self._profiling_enabled = profiling_enabled |
| 99 | |
| 100 | # Count of items ventilated by the pool |
| 101 | self._ventilated_items = 0 |
| 102 | # Count of items ventilated by each worker |
| 103 | self._ventilated_items_by_worker = [0 for _ in range(self.workers_count)] |
| 104 | # Count of items processed by each worker |
| 105 | self._ventilated_items_processed_by_worker = [0 for _ in range(self.workers_count)] |
| 106 | self._ventilator = None |
| 107 | self._get_results_worker_id = 0 |
| 108 | |
| 109 | def start(self, worker_class, worker_args=None, ventilator=None): |
| 110 | """Starts worker threads. |
| 111 | |
| 112 | :param worker_class: A class of the worker class. The class will be instantiated in the worker process. The |
| 113 | class must implement :class:`.WorkerBase` protocol |
| 114 | :param worker_setup_args: Argument that will be passed to ``args`` property of the instantiated |
| 115 | :class:`.WorkerBase` |
| 116 | :return: ``None`` |
| 117 | """ |
| 118 | # Verify stop_event and raise exception if it's already set! |
| 119 | if self._stop_event.is_set(): |
| 120 | raise RuntimeError('ThreadPool({}) cannot be reused! stop_event set? {}' |
| 121 | .format(len(self._workers), self._stop_event.is_set())) |
| 122 | |
| 123 | # Set up a channel for each worker to send work |
| 124 | self._ventilator_queues = [queue.Queue() for _ in range(self.workers_count)] |
| 125 | # Set up a channel for each worker to send results |
| 126 | self._results_queues = [ |
| 127 | queue.Queue(max(5, self._results_queue_size // self.workers_count)) |
| 128 | for _ in range(self.workers_count) |
| 129 | ] |
| 130 | self._workers = [] |
| 131 | for worker_id in range(self.workers_count): |
| 132 | # Create a closure that captures the worker_id for this specific worker |
| 133 | def make_publish_func(worker_id): |
| 134 | return lambda data: self._stop_aware_put(worker_id, data) |
no outgoing calls
searching dependent graphs…