MCPcopy
hub / github.com/uber/petastorm / process

Method process

petastorm/py_dict_reader_worker.py:124–175  ·  view source on GitHub ↗

Main worker function. Loads and returns all rows matching the predicate from a rowgroup Looks up the requested piece (a single row-group in a parquet file). If a predicate is specified, columns needed by the predicate are loaded first. If no rows in the rowgroup matches the predicat

(self, piece_index, worker_predicate, shuffle_row_drop_partition)

Source from the content-addressed store, hash-verified

122
123 # pylint: disable=arguments-differ
124 def process(self, piece_index, worker_predicate, shuffle_row_drop_partition):
125 """Main worker function. Loads and returns all rows matching the predicate from a rowgroup
126
127 Looks up the requested piece (a single row-group in a parquet file). If a predicate is specified,
128 columns needed by the predicate are loaded first. If no rows in the rowgroup matches the predicate criteria
129 the rest of the columns are not loaded.
130
131 :param piece_index:
132 :param shuffle_row_drop_partition: A tuple 2 of the current row drop partition and the total number
133 of partitions.
134 :return:
135 """
136
137 if not self._dataset:
138 self._dataset = pq.ParquetDataset(
139 self._dataset_path,
140 filesystem=self._filesystem,
141 validate_schema=False, filters=self._arrow_filters)
142
143 piece = self._split_pieces[piece_index]
144
145 # Create pyarrow file system
146 parquet_file = ParquetFile(self._dataset.fs.open(piece.path))
147
148 if not isinstance(self._local_cache, NullCache):
149 if worker_predicate:
150 raise RuntimeError('Local cache is not supported together with predicates, '
151 'unless the dataset is partitioned by the column the predicate operates on.')
152 if shuffle_row_drop_partition[1] != 1:
153 raise RuntimeError('Local cache is not supported together with shuffle_row_drop_partitions > 1')
154
155 if worker_predicate:
156 all_cols = self._load_rows_with_predicate(parquet_file, piece, worker_predicate, shuffle_row_drop_partition)
157 else:
158 # Using hash of the dataset path with the relative path in order to:
159 # 1. Make sure if a common cache serves multiple processes (e.g. redis), we don't have conflicts
160 # 2. Dataset path is hashed, to make sure we don't create too long keys, which maybe incompatible with
161 # some cache implementations
162 # 3. Still leave relative path and the piece_index in plain text to make it easier to debug
163 # self._dataset_path could be a list of urls or a string.
164 _dataset_path_for_hash = "_".join(self._dataset_path) if isinstance(self._dataset_path,
165 Iterable) else self._dataset_path
166 cache_key = '{}:{}:{}'.format(hashlib.md5(_dataset_path_for_hash.encode('utf-8')).hexdigest(),
167 piece.path, piece_index)
168 all_cols = self._local_cache.get(cache_key,
169 lambda: self._load_rows(parquet_file, piece, shuffle_row_drop_partition))
170
171 if self._ngram:
172 all_cols = self._ngram.form_ngram(data=all_cols, schema=self._schema)
173
174 if all_cols:
175 self.publish_func(all_cols)
176
177 def _load_rows(self, pq_file, piece, shuffle_row_drop_range):
178 """Loads all rows from a piece"""

Callers

nothing calls this directly

Calls 6

_load_rowsMethod · 0.95
form_ngramMethod · 0.80
joinMethod · 0.45
encodeMethod · 0.45
getMethod · 0.45

Tested by

no test coverage detected