| 46 | |
| 47 | |
| 48 | class WrapWorkFunctionForFileSystem: |
| 49 | def __init__( |
| 50 | self, |
| 51 | work_function: Callable[ |
| 52 | [Any, Callable, tuple, Dict[str, Any], Dict[str, Any]], Any |
| 53 | ], |
| 54 | ) -> None: |
| 55 | self.work_function = work_function |
| 56 | |
| 57 | def __call__( |
| 58 | self, |
| 59 | input_file_path: Path, |
| 60 | output_file_path: Path, |
| 61 | progress_bars_type: ProgressBarsType, |
| 62 | worker_index: int, |
| 63 | master_workers_queue: multiprocessing.Queue, |
| 64 | dilled_user_defined_function: bytes, |
| 65 | user_defined_function_args: tuple, |
| 66 | user_defined_function_kwargs: Dict[str, Any], |
| 67 | extra: Dict[str, Any], |
| 68 | ) -> None: |
| 69 | try: |
| 70 | # Load dataframe from input file |
| 71 | with input_file_path.open("rb") as file_descriptor: |
| 72 | data = pickle.load(file_descriptor) |
| 73 | |
| 74 | # Delete input file since we don't need it any more. It will free some RAM |
| 75 | # since the input file is stored into Shared Memory. |
| 76 | input_file_path.unlink() |
| 77 | |
| 78 | data_size = len(data) |
| 79 | user_defined_function: Callable = dill.loads(dilled_user_defined_function) |
| 80 | |
| 81 | progress_wrapped_user_defined_function = progress_wrapper( |
| 82 | user_defined_function, master_workers_queue, worker_index, data_size |
| 83 | ) |
| 84 | |
| 85 | used_user_defined_function = ( |
| 86 | progress_wrapped_user_defined_function |
| 87 | if progress_bars_type |
| 88 | in ( |
| 89 | ProgressBarsType.InUserDefinedFunction, |
| 90 | ProgressBarsType.InUserDefinedFunctionMultiplyByNumberOfColumns, |
| 91 | ) |
| 92 | else user_defined_function |
| 93 | ) |
| 94 | |
| 95 | result = self.work_function( |
| 96 | data, |
| 97 | used_user_defined_function, |
| 98 | user_defined_function_args, |
| 99 | user_defined_function_kwargs, |
| 100 | extra, |
| 101 | ) |
| 102 | |
| 103 | with output_file_path.open("wb") as file_descriptor: |
| 104 | pickle.dump(result, file_descriptor) |
| 105 |
no outgoing calls
no test coverage detected
searching dependent graphs…