MCPcopy
hub / github.com/nalepae/pandarallel / __call__

Method __call__

pandarallel/core.py:129–172  ·  view source on GitHub ↗
(
        self,
        data: Any,
        progress_bars_type: ProgressBarsType,
        worker_index: int,
        master_workers_queue: multiprocessing.Queue,
        dilled_user_defined_function: bytes,
        user_defined_function_args: tuple,
        user_defined_function_kwargs: Dict[str, Any],
        extra: Dict[str, Any],
    )

Source from the content-addressed store, hash-verified

127 self.work_function = work_function
128
129 def __call__(
130 self,
131 data: Any,
132 progress_bars_type: ProgressBarsType,
133 worker_index: int,
134 master_workers_queue: multiprocessing.Queue,
135 dilled_user_defined_function: bytes,
136 user_defined_function_args: tuple,
137 user_defined_function_kwargs: Dict[str, Any],
138 extra: Dict[str, Any],
139 ) -> Any:
140 try:
141 data_size = len(data)
142 user_defined_function: Callable = dill.loads(dilled_user_defined_function)
143
144 progress_wrapped_user_defined_function = progress_wrapper(
145 user_defined_function, master_workers_queue, worker_index, data_size
146 )
147
148 used_user_defined_function = (
149 progress_wrapped_user_defined_function
150 if progress_bars_type
151 in (
152 ProgressBarsType.InUserDefinedFunction,
153 ProgressBarsType.InUserDefinedFunctionMultiplyByNumberOfColumns,
154 )
155 else user_defined_function
156 )
157
158 results = self.work_function(
159 data,
160 used_user_defined_function,
161 user_defined_function_args,
162 user_defined_function_kwargs,
163 extra,
164 )
165
166 master_workers_queue.put((worker_index, WorkerStatus.Success, None))
167
168 return results
169
170 except:
171 master_workers_queue.put((worker_index, WorkerStatus.Error, None))
172 raise
173
174
175def wrap_reduce_function_for_file_system(

Callers

nothing calls this directly

Calls 1

progress_wrapperFunction · 0.85

Tested by

no test coverage detected