Args: config: same as in :class:`DatasetPredictorBase`. dataset: same as in :class:`DatasetPredictorBase`. nr_proc (int): number of processes to use use_gpu (bool): use GPU or CPU. If GPU, then ``nr_proc`` cannot be more than w
(self, config, dataset, nr_proc, use_gpu=True, ordered=True)
| 85 | # TODO allow unordered |
| 86 | |
| 87 | def __init__(self, config, dataset, nr_proc, use_gpu=True, ordered=True): |
| 88 | """ |
| 89 | Args: |
| 90 | config: same as in :class:`DatasetPredictorBase`. |
| 91 | dataset: same as in :class:`DatasetPredictorBase`. |
| 92 | nr_proc (int): number of processes to use |
| 93 | use_gpu (bool): use GPU or CPU. |
| 94 | If GPU, then ``nr_proc`` cannot be more than what's in |
| 95 | CUDA_VISIBLE_DEVICES. |
| 96 | ordered (bool): produce outputs in the original order of the |
| 97 | datapoints. This will be a bit slower. Otherwise, :meth:`get_result` will produce |
| 98 | outputs in any order. |
| 99 | """ |
| 100 | if config.return_input: |
| 101 | logger.warn("Using the option `return_input` in MultiProcessDatasetPredictor might be slow") |
| 102 | assert nr_proc >= 1, nr_proc |
| 103 | super(MultiProcessDatasetPredictor, self).__init__(config, dataset) |
| 104 | |
| 105 | self.nr_proc = nr_proc |
| 106 | self.ordered = ordered |
| 107 | |
| 108 | self.inqueue, self.inqueue_proc = dump_dataflow_to_process_queue( |
| 109 | self.dataset, nr_proc * 2, self.nr_proc) # put (idx, dp) to inqueue |
| 110 | |
| 111 | if use_gpu: |
| 112 | try: |
| 113 | gpus = os.environ['CUDA_VISIBLE_DEVICES'].split(',') |
| 114 | except KeyError: |
| 115 | gpus = list(range(get_num_gpu())) |
| 116 | assert len(gpus) >= self.nr_proc, \ |
| 117 | "nr_proc={} while only {} gpus available".format( |
| 118 | self.nr_proc, len(gpus)) |
| 119 | else: |
| 120 | gpus = ['-1'] * self.nr_proc |
| 121 | # worker produces (idx, result) to outqueue |
| 122 | self.outqueue = multiprocessing.Queue() |
| 123 | self.workers = [MultiProcessQueuePredictWorker( |
| 124 | i, self.inqueue, self.outqueue, self.config) |
| 125 | for i in range(self.nr_proc)] |
| 126 | |
| 127 | # start inqueue and workers |
| 128 | self.inqueue_proc.start() |
| 129 | for p, gpuid in zip(self.workers, gpus): |
| 130 | if gpuid == '-1': |
| 131 | logger.info("Worker {} uses CPU".format(p.idx)) |
| 132 | else: |
| 133 | logger.info("Worker {} uses GPU {}".format(p.idx, gpuid)) |
| 134 | with change_gpu(gpuid): |
| 135 | p.start() |
| 136 | |
| 137 | if ordered: |
| 138 | self.result_queue = OrderedResultGatherProc( |
| 139 | self.outqueue, nr_producer=self.nr_proc) |
| 140 | self.result_queue.start() |
| 141 | ensure_proc_terminate(self.result_queue) |
| 142 | else: |
| 143 | self.result_queue = self.outqueue |
| 144 | ensure_proc_terminate(self.workers + [self.inqueue_proc]) |
nothing calls this directly
no test coverage detected