一个用于包装 sync function 为 async function 的装饰器 参数: call: 被装饰的同步函数
(call: Callable[P, R])
| 246 | |
| 247 | |
| 248 | def run_sync(call: Callable[P, R]) -> Callable[P, Coroutine[None, None, R]]: |
| 249 | """一个用于包装 sync function 为 async function 的装饰器 |
| 250 | |
| 251 | 参数: |
| 252 | call: 被装饰的同步函数 |
| 253 | """ |
| 254 | |
| 255 | @wraps(call) |
| 256 | async def _wrapper(*args: P.args, **kwargs: P.kwargs) -> R: |
| 257 | return await anyio.to_thread.run_sync( |
| 258 | partial(call, *args, **kwargs), abandon_on_cancel=True |
| 259 | ) |
| 260 | |
| 261 | return _wrapper |
| 262 | |
| 263 | |
| 264 | @asynccontextmanager |
no outgoing calls
no test coverage detected