Generic polling logic. Args: target: callable that returns truthy if polling condition is met. timeout: max polling time step: interval between checking target() Returns: return value of target() if truthy within timeout F
(
target: Callable[[], T],
timeout: TimeoutType = None,
step: TimeoutType = None,
)
| 503 | |
| 504 | @staticmethod |
| 505 | def _poll_for( |
| 506 | target: Callable[[], T], |
| 507 | timeout: TimeoutType = None, |
| 508 | step: TimeoutType = None, |
| 509 | ) -> T | Literal[False]: |
| 510 | """Generic polling logic. |
| 511 | |
| 512 | Args: |
| 513 | target: callable that returns truthy if polling condition is met. |
| 514 | timeout: max polling time |
| 515 | step: interval between checking target() |
| 516 | |
| 517 | Returns: |
| 518 | return value of target() if truthy within timeout |
| 519 | False if timeout elapses |
| 520 | """ |
| 521 | if timeout is None: |
| 522 | timeout = DEFAULT_TIMEOUT |
| 523 | if step is None: |
| 524 | step = POLL_INTERVAL |
| 525 | deadline = time.time() + timeout |
| 526 | while time.time() < deadline: |
| 527 | with contextlib.suppress(Exception): |
| 528 | success = target() |
| 529 | if success: |
| 530 | return success |
| 531 | time.sleep(step) |
| 532 | return False |
| 533 | |
| 534 | @staticmethod |
| 535 | async def _poll_for_async( |
no outgoing calls
no test coverage detected