MCPcopy
hub / github.com/nalepae/pandarallel / progress_wrapper

Function progress_wrapper

pandarallel/progress_bars.py:185–218  ·  view source on GitHub ↗

Wrap the function to apply in a function which monitor the part of work already done.

(
    user_defined_function: Callable,
    master_workers_queue: multiprocessing.Queue,
    index: int,
    chunk_size: int,
)

Source from the content-addressed store, hash-verified

183
184
185def progress_wrapper(
186 user_defined_function: Callable,
187 master_workers_queue: multiprocessing.Queue,
188 index: int,
189 chunk_size: int,
190) -> Callable:
191 """Wrap the function to apply in a function which monitor the part of work already
192 done.
193 """
194 counter = count()
195 state = ProgressState(chunk_size)
196
197 def closure(*user_defined_function_args, **user_defined_functions_kwargs):
198 iteration = next(counter)
199
200 if iteration == state.next_put_iteration:
201 time_now = time_ns()
202 master_workers_queue.put_nowait((index, WorkerStatus.Running, iteration))
203
204 delta_t = time_now - state.last_put_time
205 delta_i = iteration - state.last_put_iteration
206
207 state.next_put_iteration += (
208 max(int((delta_i / delta_t) * INTERVAL_NS), 1) if delta_t != 0 else 1
209 )
210
211 state.last_put_iteration = iteration
212 state.last_put_time = time_now
213
214 return user_defined_function(
215 *user_defined_function_args, **user_defined_functions_kwargs
216 )
217
218 return closure

Callers 2

__call__Method · 0.85
__call__Method · 0.85

Calls 1

ProgressStateClass · 0.85

Tested by

no test coverage detected

Used in the wild real call sites across dependent graphs

searching dependent graphs…