Decorator to limit the execution time of a function using threading. This implementation is cross-platform (works on Windows) and thread-safe (works when called from any thread, not just the main thread), unlike signal-based approaches. Args: timeout_seconds (`int`): Maxim
(timeout_seconds: int)
| 283 | |
| 284 | |
| 285 | def timeout(timeout_seconds: int): |
| 286 | """ |
| 287 | Decorator to limit the execution time of a function using threading. |
| 288 | |
| 289 | This implementation is cross-platform (works on Windows) and thread-safe (works when |
| 290 | called from any thread, not just the main thread), unlike signal-based approaches. |
| 291 | |
| 292 | Args: |
| 293 | timeout_seconds (`int`): Maximum time in seconds allowed for function execution. |
| 294 | |
| 295 | Raises: |
| 296 | ExecutionTimeoutError: If the function execution exceeds the timeout period. |
| 297 | |
| 298 | Note: |
| 299 | If a timeout occurs, the thread running the function cannot be forcefully killed |
| 300 | in Python, so it will continue running in the background until completion. However, |
| 301 | the caller will receive a TimeoutError and can continue execution. |
| 302 | """ |
| 303 | |
| 304 | def decorator(func): |
| 305 | @wraps(func) |
| 306 | def wrapper(*args, **kwargs): |
| 307 | # Create a new ThreadPoolExecutor for each call to avoid threading issues |
| 308 | with ThreadPoolExecutor(max_workers=1) as executor: |
| 309 | future = executor.submit(func, *args, **kwargs) |
| 310 | try: |
| 311 | result = future.result(timeout=timeout_seconds) |
| 312 | return result |
| 313 | except FuturesTimeoutError: |
| 314 | raise ExecutionTimeoutError( |
| 315 | f"Code execution exceeded the maximum execution time of {timeout_seconds} seconds" |
| 316 | ) |
| 317 | |
| 318 | return wrapper |
| 319 | |
| 320 | return decorator |
| 321 | |
| 322 | |
| 323 | def get_iterable(obj): |
no outgoing calls
no test coverage detected
searching dependent graphs…