MCPcopy
hub / github.com/nalepae/pandarallel / __call__

Method __call__

pandarallel/core.py:57–110  ·  view source on GitHub ↗
(
        self,
        input_file_path: Path,
        output_file_path: Path,
        progress_bars_type: ProgressBarsType,
        worker_index: int,
        master_workers_queue: multiprocessing.Queue,
        dilled_user_defined_function: bytes,
        user_defined_function_args: tuple,
        user_defined_function_kwargs: Dict[str, Any],
        extra: Dict[str, Any],
    )

Source from the content-addressed store, hash-verified

55 self.work_function = work_function
56
57 def __call__(
58 self,
59 input_file_path: Path,
60 output_file_path: Path,
61 progress_bars_type: ProgressBarsType,
62 worker_index: int,
63 master_workers_queue: multiprocessing.Queue,
64 dilled_user_defined_function: bytes,
65 user_defined_function_args: tuple,
66 user_defined_function_kwargs: Dict[str, Any],
67 extra: Dict[str, Any],
68 ) -> None:
69 try:
70 # Load dataframe from input file
71 with input_file_path.open("rb") as file_descriptor:
72 data = pickle.load(file_descriptor)
73
74 # Delete input file since we don't need it any more. It will free some RAM
75 # since the input file is stored into Shared Memory.
76 input_file_path.unlink()
77
78 data_size = len(data)
79 user_defined_function: Callable = dill.loads(dilled_user_defined_function)
80
81 progress_wrapped_user_defined_function = progress_wrapper(
82 user_defined_function, master_workers_queue, worker_index, data_size
83 )
84
85 used_user_defined_function = (
86 progress_wrapped_user_defined_function
87 if progress_bars_type
88 in (
89 ProgressBarsType.InUserDefinedFunction,
90 ProgressBarsType.InUserDefinedFunctionMultiplyByNumberOfColumns,
91 )
92 else user_defined_function
93 )
94
95 result = self.work_function(
96 data,
97 used_user_defined_function,
98 user_defined_function_args,
99 user_defined_function_kwargs,
100 extra,
101 )
102
103 with output_file_path.open("wb") as file_descriptor:
104 pickle.dump(result, file_descriptor)
105
106 master_workers_queue.put((worker_index, WorkerStatus.Success, None))
107
108 except:
109 master_workers_queue.put((worker_index, WorkerStatus.Error, None))
110 raise
111
112
113class WrapWorkFunctionForPipe:

Callers

nothing calls this directly

Calls 1

progress_wrapperFunction · 0.85

Tested by

no test coverage detected