Args: ds (DataFlow): input DataFlow. num_prefetch (int): size of the queue to hold prefetched datapoints. Required. num_proc (int): number of processes to use. Required.
(self, ds, num_prefetch, num_proc)
| 193 | self.queue.put(dp) |
| 194 | |
| 195 | def __init__(self, ds, num_prefetch, num_proc): |
| 196 | """ |
| 197 | Args: |
| 198 | ds (DataFlow): input DataFlow. |
| 199 | num_prefetch (int): size of the queue to hold prefetched datapoints. |
| 200 | Required. |
| 201 | num_proc (int): number of processes to use. Required. |
| 202 | """ |
| 203 | # https://docs.python.org/3.6/library/multiprocessing.html?highlight=process#the-spawn-and-forkserver-start-methods |
| 204 | if os.name == 'nt': |
| 205 | logger.warn("MultiProcessRunner does support Windows. \ |
| 206 | However, Windows requires more strict picklability on processes, which may \ |
| 207 | lead of failure on some of the code.") |
| 208 | super(MultiProcessRunner, self).__init__(ds) |
| 209 | try: |
| 210 | self._size = len(ds) |
| 211 | except NotImplementedError: |
| 212 | self._size = -1 |
| 213 | assert num_proc > 0, num_proc |
| 214 | assert num_prefetch > 0, num_prefetch |
| 215 | self.num_proc = num_proc |
| 216 | self.num_prefetch = num_prefetch |
| 217 | |
| 218 | if num_proc > 1: |
| 219 | logger.info("[MultiProcessRunner] Will fork a dataflow more than one times. " |
| 220 | "This assumes the datapoints are i.i.d.") |
| 221 | |
| 222 | self.queue = mp.Queue(self.num_prefetch) |
| 223 | self.procs = [MultiProcessRunner._Worker(self.ds, self.queue, idx) |
| 224 | for idx in range(self.num_proc)] |
| 225 | ensure_proc_terminate(self.procs) |
| 226 | self._reset_done = False |
| 227 | |
| 228 | def __iter__(self): |
| 229 | for k in itertools.count(): |
nothing calls this directly
no test coverage detected