Constructs a Reader instance and uses it to performs throughput measurements. The function will spawn a new process if ``spawn_separate_process`` is set. This is needed to make memory footprint measurements accurate. :param dataset_url: A url of the dataset to be used for measurements.
(dataset_url, field_regex=None, warmup_cycles_count=300, measure_cycles_count=1000,
pool_type=WorkerPoolType.THREAD, loaders_count=3, profile_threads=False,
read_method=ReadMethod.PYTHON, shuffling_queue_size=500, min_after_dequeue=400,
reader_extra_args=None, spawn_new_process=True)
| 110 | |
| 111 | |
| 112 | def reader_throughput(dataset_url, field_regex=None, warmup_cycles_count=300, measure_cycles_count=1000, |
| 113 | pool_type=WorkerPoolType.THREAD, loaders_count=3, profile_threads=False, |
| 114 | read_method=ReadMethod.PYTHON, shuffling_queue_size=500, min_after_dequeue=400, |
| 115 | reader_extra_args=None, spawn_new_process=True): |
| 116 | """Constructs a Reader instance and uses it to performs throughput measurements. |
| 117 | |
| 118 | The function will spawn a new process if ``spawn_separate_process`` is set. This is needed to make memory footprint |
| 119 | measurements accurate. |
| 120 | |
| 121 | :param dataset_url: A url of the dataset to be used for measurements. |
| 122 | :param field_regex: A list of regular expressions. Only fields that match one of the regex patterns will be used |
| 123 | during the benchmark. |
| 124 | :param warmup_cycles_count: Number of warmup cycles. During warmup cycles no measurements are being recorded. |
| 125 | :param measure_cycles_count: Number of measurements cycles. Only time elapsed during measurements cycles are used |
| 126 | in throughput calculations. |
| 127 | :param pool_type: :class:`WorkerPoolType` enum value. |
| 128 | :param loaders_count: Number of threads (same thread is used for IO and decoding). |
| 129 | :param profile_threads: Enables profiling threads. Will print result when thread pool is shut down. |
| 130 | :param read_method: An enum :class:`ReadMethod` that defines whether a :class:`petastorm.reader.Reader` will be |
| 131 | used. |
| 132 | :param shuffling_queue_size: Maximum number of elements in the shuffling queue. |
| 133 | :param min_after_dequeue: Minimum number of elements in a shuffling queue before entries can be read from it. |
| 134 | :param reader_extra_args: Extra arguments that would be passed to Reader constructor. |
| 135 | :param spawn_new_process: This function will respawn itself in a new process if the argument is True. Spawning |
| 136 | a new process is needed to get an accurate memory footprint. |
| 137 | |
| 138 | :return: An instance of ``BenchmarkResult`` namedtuple with the results of the benchmark. The namedtuple has |
| 139 | the following fields: `time_mean`, `samples_per_second`, `memory_info` and `cpu` |
| 140 | """ |
| 141 | if not reader_extra_args: |
| 142 | reader_extra_args = dict() |
| 143 | |
| 144 | if spawn_new_process: |
| 145 | args = copy.deepcopy(locals()) |
| 146 | args['spawn_new_process'] = False |
| 147 | executor = ProcessPoolExecutor(1) |
| 148 | future = executor.submit(reader_throughput, **args) |
| 149 | return future.result() |
| 150 | |
| 151 | logger.info('Arguments: %s', locals()) |
| 152 | |
| 153 | if 'schema_fields' not in reader_extra_args: |
| 154 | unischema_fields = match_unischema_fields(get_schema_from_dataset_url(dataset_url), field_regex) |
| 155 | reader_extra_args['schema_fields'] = unischema_fields |
| 156 | |
| 157 | logger.info('Fields used in the benchmark: %s', str(reader_extra_args['schema_fields'])) |
| 158 | |
| 159 | with make_reader(dataset_url, |
| 160 | num_epochs=None, |
| 161 | reader_pool_type=str(pool_type), workers_count=loaders_count, |
| 162 | **reader_extra_args) as reader: |
| 163 | |
| 164 | if read_method == ReadMethod.PYTHON: |
| 165 | result = _time_warmup_and_work(reader, warmup_cycles_count, measure_cycles_count) |
| 166 | elif read_method == ReadMethod.TF: |
| 167 | result = _time_warmup_and_work_tf(reader, warmup_cycles_count, measure_cycles_count, |
| 168 | shuffling_queue_size, min_after_dequeue) |
| 169 | else: |
searching dependent graphs…