Wrap the function to apply in a function which monitor the part of work already done.
(
user_defined_function: Callable,
master_workers_queue: multiprocessing.Queue,
index: int,
chunk_size: int,
)
| 183 | |
| 184 | |
| 185 | def progress_wrapper( |
| 186 | user_defined_function: Callable, |
| 187 | master_workers_queue: multiprocessing.Queue, |
| 188 | index: int, |
| 189 | chunk_size: int, |
| 190 | ) -> Callable: |
| 191 | """Wrap the function to apply in a function which monitor the part of work already |
| 192 | done. |
| 193 | """ |
| 194 | counter = count() |
| 195 | state = ProgressState(chunk_size) |
| 196 | |
| 197 | def closure(*user_defined_function_args, **user_defined_functions_kwargs): |
| 198 | iteration = next(counter) |
| 199 | |
| 200 | if iteration == state.next_put_iteration: |
| 201 | time_now = time_ns() |
| 202 | master_workers_queue.put_nowait((index, WorkerStatus.Running, iteration)) |
| 203 | |
| 204 | delta_t = time_now - state.last_put_time |
| 205 | delta_i = iteration - state.last_put_iteration |
| 206 | |
| 207 | state.next_put_iteration += ( |
| 208 | max(int((delta_i / delta_t) * INTERVAL_NS), 1) if delta_t != 0 else 1 |
| 209 | ) |
| 210 | |
| 211 | state.last_put_iteration = iteration |
| 212 | state.last_put_time = time_now |
| 213 | |
| 214 | return user_defined_function( |
| 215 | *user_defined_function_args, **user_defined_functions_kwargs |
| 216 | ) |
| 217 | |
| 218 | return closure |
no test coverage detected
searching dependent graphs…