| 112 | |
| 113 | |
| 114 | class ProcessPool(object): |
| 115 | def __init__(self, workers_count, serializer=None, zmq_copy_buffers=True): |
| 116 | """Initializes a ProcessPool. |
| 117 | |
| 118 | This pool is different from standard Python pool implementations by the fact that the workers are spawned |
| 119 | without using fork. Some issues with using jvm based HDFS driver were observed when the process was forked |
| 120 | (could not access HDFS from the forked worker if the driver was already used in the parent process). |
| 121 | |
| 122 | :param workers_count: Number of processes to be spawned |
| 123 | :param serializer: An object that would be used for data payload serialization when sending data from a worker |
| 124 | process to the main process. ``PickleSerializer`` is used by default. May use |
| 125 | :class:`petastorm.reader_impl.ArrowTableSerializer` (should be used together with |
| 126 | :class:`petastorm.reader.ArrowReader`) |
| 127 | :param zmq_copy_buffers: When set to False, we will use a zero-memory-copy feature of recv_multipart. |
| 128 | A downside of using this zero memory copy feature is that it does not play nice with Python GC and cases |
| 129 | were observed when it resulted in wild memory footprint swings. Having the buffers copied is typically a |
| 130 | safer alternative. |
| 131 | """ |
| 132 | self._workers = [] |
| 133 | self._ventilator_send = None |
| 134 | self._control_sender = None |
| 135 | self.workers_count = workers_count |
| 136 | self._results_receiver_poller = None |
| 137 | self._results_receiver = None |
| 138 | |
| 139 | self._ventilated_items = 0 |
| 140 | self._ventilated_items_processed = 0 |
| 141 | self._ventilator = None |
| 142 | self._serializer = serializer or PickleSerializer() |
| 143 | self._zmq_copy_buffers = zmq_copy_buffers |
| 144 | |
| 145 | def _create_local_socket_on_random_port(self, context, socket_type): |
| 146 | """Creates a zmq socket on a random port. |
| 147 | |
| 148 | :param context: zmq context |
| 149 | :param socket_type: zmq socket type |
| 150 | :return: A tuple: ``(zmq_socket, endpoint_address)`` |
| 151 | """ |
| 152 | LOCALHOST = 'tcp://127.0.0.1' |
| 153 | socket = context.socket(socket_type) |
| 154 | |
| 155 | # There are race conditions where the socket can close when messages are still trying to be sent by zmq. |
| 156 | # This can end up causing zmq to block indefinitely when sending objects or shutting down. Having the socket |
| 157 | # linger on close helps prevent this. |
| 158 | socket.linger = _SOCKET_LINGER_MS |
| 159 | |
| 160 | port = socket.bind_to_random_port(LOCALHOST) |
| 161 | return socket, '{}:{}'.format(LOCALHOST, port) |
| 162 | |
| 163 | def start(self, worker_class, worker_setup_args=None, ventilator=None): |
| 164 | """Starts worker processes. |
| 165 | |
| 166 | Will block until all processes to subscribe to the worker queue (the messages are distributed by zmq on write |
| 167 | so if only one, out of many, workers is up at the time of 'ventilation', the initial load won't be balanced |
| 168 | between workers. If can not start the workers in timely fashion, will raise an exception. |
| 169 | |
| 170 | :param worker_class: A class of the worker class. The class will be instantiated in the worker process. The |
| 171 | class must implement :class:`.WorkerBase` protocol. |
no outgoing calls
searching dependent graphs…