| 111 | |
| 112 | |
| 113 | class WrapWorkFunctionForPipe: |
| 114 | def __init__( |
| 115 | self, |
| 116 | work_function: Callable[ |
| 117 | [ |
| 118 | Any, |
| 119 | Callable, |
| 120 | tuple, |
| 121 | Dict[str, Any], |
| 122 | Dict[str, Any], |
| 123 | ], |
| 124 | Any, |
| 125 | ], |
| 126 | ) -> None: |
| 127 | self.work_function = work_function |
| 128 | |
| 129 | def __call__( |
| 130 | self, |
| 131 | data: Any, |
| 132 | progress_bars_type: ProgressBarsType, |
| 133 | worker_index: int, |
| 134 | master_workers_queue: multiprocessing.Queue, |
| 135 | dilled_user_defined_function: bytes, |
| 136 | user_defined_function_args: tuple, |
| 137 | user_defined_function_kwargs: Dict[str, Any], |
| 138 | extra: Dict[str, Any], |
| 139 | ) -> Any: |
| 140 | try: |
| 141 | data_size = len(data) |
| 142 | user_defined_function: Callable = dill.loads(dilled_user_defined_function) |
| 143 | |
| 144 | progress_wrapped_user_defined_function = progress_wrapper( |
| 145 | user_defined_function, master_workers_queue, worker_index, data_size |
| 146 | ) |
| 147 | |
| 148 | used_user_defined_function = ( |
| 149 | progress_wrapped_user_defined_function |
| 150 | if progress_bars_type |
| 151 | in ( |
| 152 | ProgressBarsType.InUserDefinedFunction, |
| 153 | ProgressBarsType.InUserDefinedFunctionMultiplyByNumberOfColumns, |
| 154 | ) |
| 155 | else user_defined_function |
| 156 | ) |
| 157 | |
| 158 | results = self.work_function( |
| 159 | data, |
| 160 | used_user_defined_function, |
| 161 | user_defined_function_args, |
| 162 | user_defined_function_kwargs, |
| 163 | extra, |
| 164 | ) |
| 165 | |
| 166 | master_workers_queue.put((worker_index, WorkerStatus.Success, None)) |
| 167 | |
| 168 | return results |
| 169 | |
| 170 | except: |
no outgoing calls
no test coverage detected
searching dependent graphs…