MCPcopy
hub / github.com/uber/petastorm / reader_throughput

Function reader_throughput

petastorm/benchmark/throughput.py:112–172  ·  view source on GitHub ↗

Constructs a Reader instance and uses it to performs throughput measurements. The function will spawn a new process if ``spawn_separate_process`` is set. This is needed to make memory footprint measurements accurate. :param dataset_url: A url of the dataset to be used for measurements.

(dataset_url, field_regex=None, warmup_cycles_count=300, measure_cycles_count=1000,
                      pool_type=WorkerPoolType.THREAD, loaders_count=3, profile_threads=False,
                      read_method=ReadMethod.PYTHON, shuffling_queue_size=500, min_after_dequeue=400,
                      reader_extra_args=None, spawn_new_process=True)

Source from the content-addressed store, hash-verified

110
111
112def reader_throughput(dataset_url, field_regex=None, warmup_cycles_count=300, measure_cycles_count=1000,
113 pool_type=WorkerPoolType.THREAD, loaders_count=3, profile_threads=False,
114 read_method=ReadMethod.PYTHON, shuffling_queue_size=500, min_after_dequeue=400,
115 reader_extra_args=None, spawn_new_process=True):
116 """Constructs a Reader instance and uses it to performs throughput measurements.
117
118 The function will spawn a new process if ``spawn_separate_process`` is set. This is needed to make memory footprint
119 measurements accurate.
120
121 :param dataset_url: A url of the dataset to be used for measurements.
122 :param field_regex: A list of regular expressions. Only fields that match one of the regex patterns will be used
123 during the benchmark.
124 :param warmup_cycles_count: Number of warmup cycles. During warmup cycles no measurements are being recorded.
125 :param measure_cycles_count: Number of measurements cycles. Only time elapsed during measurements cycles are used
126 in throughput calculations.
127 :param pool_type: :class:`WorkerPoolType` enum value.
128 :param loaders_count: Number of threads (same thread is used for IO and decoding).
129 :param profile_threads: Enables profiling threads. Will print result when thread pool is shut down.
130 :param read_method: An enum :class:`ReadMethod` that defines whether a :class:`petastorm.reader.Reader` will be
131 used.
132 :param shuffling_queue_size: Maximum number of elements in the shuffling queue.
133 :param min_after_dequeue: Minimum number of elements in a shuffling queue before entries can be read from it.
134 :param reader_extra_args: Extra arguments that would be passed to Reader constructor.
135 :param spawn_new_process: This function will respawn itself in a new process if the argument is True. Spawning
136 a new process is needed to get an accurate memory footprint.
137
138 :return: An instance of ``BenchmarkResult`` namedtuple with the results of the benchmark. The namedtuple has
139 the following fields: `time_mean`, `samples_per_second`, `memory_info` and `cpu`
140 """
141 if not reader_extra_args:
142 reader_extra_args = dict()
143
144 if spawn_new_process:
145 args = copy.deepcopy(locals())
146 args['spawn_new_process'] = False
147 executor = ProcessPoolExecutor(1)
148 future = executor.submit(reader_throughput, **args)
149 return future.result()
150
151 logger.info('Arguments: %s', locals())
152
153 if 'schema_fields' not in reader_extra_args:
154 unischema_fields = match_unischema_fields(get_schema_from_dataset_url(dataset_url), field_regex)
155 reader_extra_args['schema_fields'] = unischema_fields
156
157 logger.info('Fields used in the benchmark: %s', str(reader_extra_args['schema_fields']))
158
159 with make_reader(dataset_url,
160 num_epochs=None,
161 reader_pool_type=str(pool_type), workers_count=loaders_count,
162 **reader_extra_args) as reader:
163
164 if read_method == ReadMethod.PYTHON:
165 result = _time_warmup_and_work(reader, warmup_cycles_count, measure_cycles_count)
166 elif read_method == ReadMethod.TF:
167 result = _time_warmup_and_work_tf(reader, warmup_cycles_count, measure_cycles_count,
168 shuffling_queue_size, min_after_dequeue)
169 else:

Callers 6

_mainFunction · 0.90
test_tf_thread_pool_runFunction · 0.90
test_all_fieldsFunction · 0.90

Calls 5

match_unischema_fieldsFunction · 0.90
make_readerFunction · 0.90
_time_warmup_and_workFunction · 0.85
_time_warmup_and_work_tfFunction · 0.85

Tested by 5

test_tf_thread_pool_runFunction · 0.72
test_all_fieldsFunction · 0.72

Used in the wild real call sites across dependent graphs

searching dependent graphs…