MCPcopy
hub / github.com/reflex-dev/reflex / _poll_for

Method _poll_for

reflex/testing.py:505–532  ·  view source on GitHub ↗

Generic polling logic. Args: target: callable that returns truthy if polling condition is met. timeout: max polling time step: interval between checking target() Returns: return value of target() if truthy within timeout F

(
        target: Callable[[], T],
        timeout: TimeoutType = None,
        step: TimeoutType = None,
    )

Source from the content-addressed store, hash-verified

503
504 @staticmethod
505 def _poll_for(
506 target: Callable[[], T],
507 timeout: TimeoutType = None,
508 step: TimeoutType = None,
509 ) -> T | Literal[False]:
510 """Generic polling logic.
511
512 Args:
513 target: callable that returns truthy if polling condition is met.
514 timeout: max polling time
515 step: interval between checking target()
516
517 Returns:
518 return value of target() if truthy within timeout
519 False if timeout elapses
520 """
521 if timeout is None:
522 timeout = DEFAULT_TIMEOUT
523 if step is None:
524 step = POLL_INTERVAL
525 deadline = time.time() + timeout
526 while time.time() < deadline:
527 with contextlib.suppress(Exception):
528 success = target()
529 if success:
530 return success
531 time.sleep(step)
532 return False
533
534 @staticmethod
535 async def _poll_for_async(

Callers 15

_poll_for_serversMethod · 0.95
poll_for_contentMethod · 0.95
poll_for_valueMethod · 0.95
assert_tokenFunction · 0.80
test_on_dropFunction · 0.80
_assert_tokenFunction · 0.80
test_connection_bannerFunction · 0.80
test_fast_yieldingFunction · 0.80
poll_assert_event_orderFunction · 0.80
test_linked_stateFunction · 0.80

Calls

no outgoing calls

Tested by

no test coverage detected