MCPcopy Index your code
hub / github.com/tensorpack/tensorpack / __init__

Method __init__

tensorpack/dataflow/parallel.py:195–226  ·  view source on GitHub ↗

Args: ds (DataFlow): input DataFlow. num_prefetch (int): size of the queue to hold prefetched datapoints. Required. num_proc (int): number of processes to use. Required.

(self, ds, num_prefetch, num_proc)

Source from the content-addressed store, hash-verified

193 self.queue.put(dp)
194
195 def __init__(self, ds, num_prefetch, num_proc):
196 """
197 Args:
198 ds (DataFlow): input DataFlow.
199 num_prefetch (int): size of the queue to hold prefetched datapoints.
200 Required.
201 num_proc (int): number of processes to use. Required.
202 """
203 # https://docs.python.org/3.6/library/multiprocessing.html?highlight=process#the-spawn-and-forkserver-start-methods
204 if os.name == 'nt':
205 logger.warn("MultiProcessRunner does support Windows. \
206However, Windows requires more strict picklability on processes, which may \
207lead of failure on some of the code.")
208 super(MultiProcessRunner, self).__init__(ds)
209 try:
210 self._size = len(ds)
211 except NotImplementedError:
212 self._size = -1
213 assert num_proc > 0, num_proc
214 assert num_prefetch > 0, num_prefetch
215 self.num_proc = num_proc
216 self.num_prefetch = num_prefetch
217
218 if num_proc > 1:
219 logger.info("[MultiProcessRunner] Will fork a dataflow more than one times. "
220 "This assumes the datapoints are i.i.d.")
221
222 self.queue = mp.Queue(self.num_prefetch)
223 self.procs = [MultiProcessRunner._Worker(self.ds, self.queue, idx)
224 for idx in range(self.num_proc)]
225 ensure_proc_terminate(self.procs)
226 self._reset_done = False
227
228 def __iter__(self):
229 for k in itertools.count():

Callers

nothing calls this directly

Calls 2

ensure_proc_terminateFunction · 0.85
__init__Method · 0.45

Tested by

no test coverage detected