Same as :class:`MapData`, but start threads to run the mapping function. This is useful when the mapping function is the bottleneck, but you don't want to start processes for the entire dataflow pipeline. The semantics of this class is **identical** to :class:`MapData` except for t
| 90 | |
| 91 | |
| 92 | class MultiThreadMapData(_ParallelMapData): |
| 93 | """ |
| 94 | Same as :class:`MapData`, but start threads to run the mapping function. |
| 95 | This is useful when the mapping function is the bottleneck, but you don't |
| 96 | want to start processes for the entire dataflow pipeline. |
| 97 | |
| 98 | The semantics of this class is **identical** to :class:`MapData` except for the ordering. |
| 99 | Threads run in parallel and can take different time to run the |
| 100 | mapping function. Therefore the order of datapoints won't be preserved. |
| 101 | |
| 102 | When ``strict=True``, ``MultiThreadMapData(df, ...)`` |
| 103 | is guaranteed to produce the exact set of data as ``MapData(df, ...)``, |
| 104 | if both are iterated until ``StopIteration``. But the produced data will have different ordering. |
| 105 | The behavior of strict mode is undefined if the given dataflow ``df`` is infinite. |
| 106 | |
| 107 | When ``strict=False``, the data that's produced by ``MultiThreadMapData(df, ...)`` |
| 108 | is a reordering of the data produced by ``RepeatedData(MapData(df, ...), -1)``. |
| 109 | In other words, first pass of ``MultiThreadMapData.__iter__`` may contain |
| 110 | datapoints from the second pass of ``df.__iter__``. |
| 111 | |
| 112 | |
| 113 | Note: |
| 114 | 1. You should avoid starting many threads in your main process to reduce GIL contention. |
| 115 | |
| 116 | The threads will only start in the process which calls :meth:`reset_state()`. |
| 117 | Therefore you can use ``MultiProcessRunnerZMQ(MultiThreadMapData(...), 1)`` |
| 118 | to reduce GIL contention. |
| 119 | """ |
| 120 | class _Worker(StoppableThread): |
| 121 | def __init__(self, inq, outq, evt, map_func): |
| 122 | super(MultiThreadMapData._Worker, self).__init__(evt) |
| 123 | self.inq = inq |
| 124 | self.outq = outq |
| 125 | self.func = map_func |
| 126 | self.daemon = True |
| 127 | |
| 128 | def run(self): |
| 129 | try: |
| 130 | while True: |
| 131 | dp = self.queue_get_stoppable(self.inq) |
| 132 | if self.stopped(): |
| 133 | return |
| 134 | # cannot ignore None here. will lead to unsynced send/recv |
| 135 | obj = self.func(dp) |
| 136 | self.queue_put_stoppable(self.outq, obj) |
| 137 | except Exception: |
| 138 | if self.stopped(): |
| 139 | pass # skip duplicated error messages |
| 140 | else: |
| 141 | raise |
| 142 | finally: |
| 143 | self.stop() |
| 144 | |
| 145 | def __init__(self, ds, num_thread=None, map_func=None, *, buffer_size=200, strict=False): |
| 146 | """ |
| 147 | Args: |
| 148 | ds (DataFlow): the dataflow to map |
| 149 | num_thread (int): number of threads to use |
no outgoing calls
no test coverage detected