MCPcopy Index your code
hub / github.com/tensorpack/tensorpack / MultiThreadMapData

Class MultiThreadMapData

tensorpack/dataflow/parallel_map.py:92–209  ·  view source on GitHub ↗

Same as :class:`MapData`, but start threads to run the mapping function. This is useful when the mapping function is the bottleneck, but you don't want to start processes for the entire dataflow pipeline. The semantics of this class is **identical** to :class:`MapData` except for t

Source from the content-addressed store, hash-verified

90
91
92class MultiThreadMapData(_ParallelMapData):
93 """
94 Same as :class:`MapData`, but start threads to run the mapping function.
95 This is useful when the mapping function is the bottleneck, but you don't
96 want to start processes for the entire dataflow pipeline.
97
98 The semantics of this class is **identical** to :class:`MapData` except for the ordering.
99 Threads run in parallel and can take different time to run the
100 mapping function. Therefore the order of datapoints won't be preserved.
101
102 When ``strict=True``, ``MultiThreadMapData(df, ...)``
103 is guaranteed to produce the exact set of data as ``MapData(df, ...)``,
104 if both are iterated until ``StopIteration``. But the produced data will have different ordering.
105 The behavior of strict mode is undefined if the given dataflow ``df`` is infinite.
106
107 When ``strict=False``, the data that's produced by ``MultiThreadMapData(df, ...)``
108 is a reordering of the data produced by ``RepeatedData(MapData(df, ...), -1)``.
109 In other words, first pass of ``MultiThreadMapData.__iter__`` may contain
110 datapoints from the second pass of ``df.__iter__``.
111
112
113 Note:
114 1. You should avoid starting many threads in your main process to reduce GIL contention.
115
116 The threads will only start in the process which calls :meth:`reset_state()`.
117 Therefore you can use ``MultiProcessRunnerZMQ(MultiThreadMapData(...), 1)``
118 to reduce GIL contention.
119 """
120 class _Worker(StoppableThread):
121 def __init__(self, inq, outq, evt, map_func):
122 super(MultiThreadMapData._Worker, self).__init__(evt)
123 self.inq = inq
124 self.outq = outq
125 self.func = map_func
126 self.daemon = True
127
128 def run(self):
129 try:
130 while True:
131 dp = self.queue_get_stoppable(self.inq)
132 if self.stopped():
133 return
134 # cannot ignore None here. will lead to unsynced send/recv
135 obj = self.func(dp)
136 self.queue_put_stoppable(self.outq, obj)
137 except Exception:
138 if self.stopped():
139 pass # skip duplicated error messages
140 else:
141 raise
142 finally:
143 self.stop()
144
145 def __init__(self, ds, num_thread=None, map_func=None, *, buffer_size=200, strict=False):
146 """
147 Args:
148 ds (DataFlow): the dataflow to map
149 num_thread (int): number of threads to use

Callers 6

get_imagenet_dataflowFunction · 0.90
get_imagenet_dataflowFunction · 0.90
get_train_dataflowFunction · 0.90
get_imagenet_dataflowFunction · 0.90
get_imagenet_dataflowFunction · 0.90
parallel_map.pyFile · 0.85

Calls

no outgoing calls

Tested by

no test coverage detected