MCPcopy
hub / github.com/uber/petastorm / ProcessPool

Class ProcessPool

petastorm/workers_pool/process_pool.py:114–312  ·  view source on GitHub ↗

Source from the content-addressed store, hash-verified

112
113
114class ProcessPool(object):
115 def __init__(self, workers_count, serializer=None, zmq_copy_buffers=True):
116 """Initializes a ProcessPool.
117
118 This pool is different from standard Python pool implementations by the fact that the workers are spawned
119 without using fork. Some issues with using jvm based HDFS driver were observed when the process was forked
120 (could not access HDFS from the forked worker if the driver was already used in the parent process).
121
122 :param workers_count: Number of processes to be spawned
123 :param serializer: An object that would be used for data payload serialization when sending data from a worker
124 process to the main process. ``PickleSerializer`` is used by default. May use
125 :class:`petastorm.reader_impl.ArrowTableSerializer` (should be used together with
126 :class:`petastorm.reader.ArrowReader`)
127 :param zmq_copy_buffers: When set to False, we will use a zero-memory-copy feature of recv_multipart.
128 A downside of using this zero memory copy feature is that it does not play nice with Python GC and cases
129 were observed when it resulted in wild memory footprint swings. Having the buffers copied is typically a
130 safer alternative.
131 """
132 self._workers = []
133 self._ventilator_send = None
134 self._control_sender = None
135 self.workers_count = workers_count
136 self._results_receiver_poller = None
137 self._results_receiver = None
138
139 self._ventilated_items = 0
140 self._ventilated_items_processed = 0
141 self._ventilator = None
142 self._serializer = serializer or PickleSerializer()
143 self._zmq_copy_buffers = zmq_copy_buffers
144
145 def _create_local_socket_on_random_port(self, context, socket_type):
146 """Creates a zmq socket on a random port.
147
148 :param context: zmq context
149 :param socket_type: zmq socket type
150 :return: A tuple: ``(zmq_socket, endpoint_address)``
151 """
152 LOCALHOST = 'tcp://127.0.0.1'
153 socket = context.socket(socket_type)
154
155 # There are race conditions where the socket can close when messages are still trying to be sent by zmq.
156 # This can end up causing zmq to block indefinitely when sending objects or shutting down. Having the socket
157 # linger on close helps prevent this.
158 socket.linger = _SOCKET_LINGER_MS
159
160 port = socket.bind_to_random_port(LOCALHOST)
161 return socket, '{}:{}'.format(LOCALHOST, port)
162
163 def start(self, worker_class, worker_setup_args=None, ventilator=None):
164 """Starts worker processes.
165
166 Will block until all processes to subscribe to the worker queue (the messages are distributed by zmq on write
167 so if only one, out of many, workers is up at the time of 'ventilation', the initial load won't be balanced
168 between workers. If can not start the workers in timely fashion, will raise an exception.
169
170 :param worker_class: A class of the worker class. The class will be instantiated in the worker process. The
171 class must implement :class:`.WorkerBase` protocol.

Calls

no outgoing calls

Used in the wild real call sites across dependent graphs

searching dependent graphs…