MCPcopy
hub / github.com/GetBindu/Bindu / wrapper

Function wrapper

bindu/utils/retry.py:120–146  ·  view source on GitHub ↗
(*args: Any, **kwargs: Any)

Source from the content-addressed store, hash-verified

118 def decorator(func: F) -> F:
119 @wraps(func)
120 async def wrapper(*args: Any, **kwargs: Any) -> Any:
121 # Get retry parameters from settings or use provided values
122 _max_attempts = max_attempts or getattr(app_settings.retry, max_key)
123 _min_wait = min_wait or getattr(app_settings.retry, min_key)
124 _max_wait = max_wait or getattr(app_settings.retry, max_wait_key)
125
126 # Choose wait strategy based on use_jitter
127 wait_strategy = (
128 wait_random_exponential(multiplier=1, min=_min_wait, max=_max_wait)
129 if use_jitter
130 else wait_exponential(multiplier=1, min=_min_wait, max=_max_wait)
131 )
132
133 async for attempt in AsyncRetrying(
134 stop=stop_after_attempt(_max_attempts),
135 wait=wait_strategy,
136 retry=retry_if_exception_type(TRANSIENT_EXCEPTIONS),
137 before_sleep=before_sleep_log(logger, logging.WARNING),
138 after=after_log(logger, logging.INFO),
139 reraise=True,
140 ):
141 with attempt:
142 logger.debug(
143 f"Executing {operation_type} operation {func.__name__} "
144 f"(attempt {attempt.retry_state.attempt_number}/{_max_attempts})"
145 )
146 return await func(*args, **kwargs)
147
148 return wrapper # type: ignore
149

Callers

nothing calls this directly

Calls 1

debugMethod · 0.80

Tested by

no test coverage detected