Main worker function. Loads and returns all rows matching the predicate from a rowgroup Looks up the requested piece (a single row-group in a parquet file). If a predicate is specified, columns needed by the predicate are loaded first. If no rows in the rowgroup matches the predicat
(self, piece_index, worker_predicate, shuffle_row_drop_partition)
| 122 | |
| 123 | # pylint: disable=arguments-differ |
| 124 | def process(self, piece_index, worker_predicate, shuffle_row_drop_partition): |
| 125 | """Main worker function. Loads and returns all rows matching the predicate from a rowgroup |
| 126 | |
| 127 | Looks up the requested piece (a single row-group in a parquet file). If a predicate is specified, |
| 128 | columns needed by the predicate are loaded first. If no rows in the rowgroup matches the predicate criteria |
| 129 | the rest of the columns are not loaded. |
| 130 | |
| 131 | :param piece_index: |
| 132 | :param shuffle_row_drop_partition: A tuple 2 of the current row drop partition and the total number |
| 133 | of partitions. |
| 134 | :return: |
| 135 | """ |
| 136 | |
| 137 | if not self._dataset: |
| 138 | self._dataset = pq.ParquetDataset( |
| 139 | self._dataset_path, |
| 140 | filesystem=self._filesystem, |
| 141 | validate_schema=False, filters=self._arrow_filters) |
| 142 | |
| 143 | piece = self._split_pieces[piece_index] |
| 144 | |
| 145 | # Create pyarrow file system |
| 146 | parquet_file = ParquetFile(self._dataset.fs.open(piece.path)) |
| 147 | |
| 148 | if not isinstance(self._local_cache, NullCache): |
| 149 | if worker_predicate: |
| 150 | raise RuntimeError('Local cache is not supported together with predicates, ' |
| 151 | 'unless the dataset is partitioned by the column the predicate operates on.') |
| 152 | if shuffle_row_drop_partition[1] != 1: |
| 153 | raise RuntimeError('Local cache is not supported together with shuffle_row_drop_partitions > 1') |
| 154 | |
| 155 | if worker_predicate: |
| 156 | all_cols = self._load_rows_with_predicate(parquet_file, piece, worker_predicate, shuffle_row_drop_partition) |
| 157 | else: |
| 158 | # Using hash of the dataset path with the relative path in order to: |
| 159 | # 1. Make sure if a common cache serves multiple processes (e.g. redis), we don't have conflicts |
| 160 | # 2. Dataset path is hashed, to make sure we don't create too long keys, which maybe incompatible with |
| 161 | # some cache implementations |
| 162 | # 3. Still leave relative path and the piece_index in plain text to make it easier to debug |
| 163 | # self._dataset_path could be a list of urls or a string. |
| 164 | _dataset_path_for_hash = "_".join(self._dataset_path) if isinstance(self._dataset_path, |
| 165 | Iterable) else self._dataset_path |
| 166 | cache_key = '{}:{}:{}'.format(hashlib.md5(_dataset_path_for_hash.encode('utf-8')).hexdigest(), |
| 167 | piece.path, piece_index) |
| 168 | all_cols = self._local_cache.get(cache_key, |
| 169 | lambda: self._load_rows(parquet_file, piece, shuffle_row_drop_partition)) |
| 170 | |
| 171 | if self._ngram: |
| 172 | all_cols = self._ngram.form_ngram(data=all_cols, schema=self._schema) |
| 173 | |
| 174 | if all_cols: |
| 175 | self.publish_func(all_cols) |
| 176 | |
| 177 | def _load_rows(self, pq_file, piece, shuffle_row_drop_range): |
| 178 | """Loads all rows from a piece""" |
nothing calls this directly
no test coverage detected