(
self,
data: Any,
progress_bars_type: ProgressBarsType,
worker_index: int,
master_workers_queue: multiprocessing.Queue,
dilled_user_defined_function: bytes,
user_defined_function_args: tuple,
user_defined_function_kwargs: Dict[str, Any],
extra: Dict[str, Any],
)
| 127 | self.work_function = work_function |
| 128 | |
| 129 | def __call__( |
| 130 | self, |
| 131 | data: Any, |
| 132 | progress_bars_type: ProgressBarsType, |
| 133 | worker_index: int, |
| 134 | master_workers_queue: multiprocessing.Queue, |
| 135 | dilled_user_defined_function: bytes, |
| 136 | user_defined_function_args: tuple, |
| 137 | user_defined_function_kwargs: Dict[str, Any], |
| 138 | extra: Dict[str, Any], |
| 139 | ) -> Any: |
| 140 | try: |
| 141 | data_size = len(data) |
| 142 | user_defined_function: Callable = dill.loads(dilled_user_defined_function) |
| 143 | |
| 144 | progress_wrapped_user_defined_function = progress_wrapper( |
| 145 | user_defined_function, master_workers_queue, worker_index, data_size |
| 146 | ) |
| 147 | |
| 148 | used_user_defined_function = ( |
| 149 | progress_wrapped_user_defined_function |
| 150 | if progress_bars_type |
| 151 | in ( |
| 152 | ProgressBarsType.InUserDefinedFunction, |
| 153 | ProgressBarsType.InUserDefinedFunctionMultiplyByNumberOfColumns, |
| 154 | ) |
| 155 | else user_defined_function |
| 156 | ) |
| 157 | |
| 158 | results = self.work_function( |
| 159 | data, |
| 160 | used_user_defined_function, |
| 161 | user_defined_function_args, |
| 162 | user_defined_function_kwargs, |
| 163 | extra, |
| 164 | ) |
| 165 | |
| 166 | master_workers_queue.put((worker_index, WorkerStatus.Success, None)) |
| 167 | |
| 168 | return results |
| 169 | |
| 170 | except: |
| 171 | master_workers_queue.put((worker_index, WorkerStatus.Error, None)) |
| 172 | raise |
| 173 | |
| 174 | |
| 175 | def wrap_reduce_function_for_file_system( |
nothing calls this directly
no test coverage detected