MCPcopy
hub / github.com/nalepae/pandarallel / WrapWorkFunctionForPipe

Class WrapWorkFunctionForPipe

pandarallel/core.py:113–172  ·  view source on GitHub ↗

Source from the content-addressed store, hash-verified

111
112
113class WrapWorkFunctionForPipe:
114 def __init__(
115 self,
116 work_function: Callable[
117 [
118 Any,
119 Callable,
120 tuple,
121 Dict[str, Any],
122 Dict[str, Any],
123 ],
124 Any,
125 ],
126 ) -> None:
127 self.work_function = work_function
128
129 def __call__(
130 self,
131 data: Any,
132 progress_bars_type: ProgressBarsType,
133 worker_index: int,
134 master_workers_queue: multiprocessing.Queue,
135 dilled_user_defined_function: bytes,
136 user_defined_function_args: tuple,
137 user_defined_function_kwargs: Dict[str, Any],
138 extra: Dict[str, Any],
139 ) -> Any:
140 try:
141 data_size = len(data)
142 user_defined_function: Callable = dill.loads(dilled_user_defined_function)
143
144 progress_wrapped_user_defined_function = progress_wrapper(
145 user_defined_function, master_workers_queue, worker_index, data_size
146 )
147
148 used_user_defined_function = (
149 progress_wrapped_user_defined_function
150 if progress_bars_type
151 in (
152 ProgressBarsType.InUserDefinedFunction,
153 ProgressBarsType.InUserDefinedFunctionMultiplyByNumberOfColumns,
154 )
155 else user_defined_function
156 )
157
158 results = self.work_function(
159 data,
160 used_user_defined_function,
161 user_defined_function_args,
162 user_defined_function_kwargs,
163 extra,
164 )
165
166 master_workers_queue.put((worker_index, WorkerStatus.Success, None))
167
168 return results
169
170 except:

Callers 1

closureFunction · 0.85

Calls

no outgoing calls

Tested by

no test coverage detected

Used in the wild real call sites across dependent graphs

searching dependent graphs…