This wrapper transforms a `reduce` function which takes as input: - A list of pandas Dataframe - An user defined function and which returns a pandas Dataframe, into a `reduct` function which takes as input: - A list of paths where pandas Dataframe are pickled which returns a pan
(
reduce_function: Callable[[Iterator, Dict[str, Any]], Any]
)
| 173 | |
| 174 | |
| 175 | def wrap_reduce_function_for_file_system( |
| 176 | reduce_function: Callable[[Iterator, Dict[str, Any]], Any] |
| 177 | ) -> Callable[[Iterator[Path], Dict[str, Any]], Any]: |
| 178 | """This wrapper transforms a `reduce` function which takes as input: |
| 179 | - A list of pandas Dataframe |
| 180 | - An user defined function |
| 181 | and which returns a pandas Dataframe, into a `reduct` function which takes as input: |
| 182 | - A list of paths where pandas Dataframe are pickled |
| 183 | which returns a pandas Dataframe. |
| 184 | """ |
| 185 | |
| 186 | def closure(output_file_paths: Iterator[Path], extra: Dict[str, Any]) -> Any: |
| 187 | def get_dataframe_and_delete_file(file_path: Path) -> Any: |
| 188 | with file_path.open("rb") as file_descriptor: |
| 189 | data = pickle.load(file_descriptor) |
| 190 | |
| 191 | file_path.unlink() |
| 192 | return data |
| 193 | |
| 194 | dfs = ( |
| 195 | get_dataframe_and_delete_file(output_file_path) |
| 196 | for output_file_path in output_file_paths |
| 197 | ) |
| 198 | |
| 199 | return reduce_function(dfs, extra) |
| 200 | |
| 201 | return closure |
| 202 | |
| 203 | |
| 204 | def parallelize_with_memory_file_system( |
no outgoing calls
no test coverage detected
searching dependent graphs…