Run async code from sync callers (CLI and public API). If already inside an event loop, execute in a helper thread to avoid `RuntimeError: asyncio.run() cannot be called from a running event loop`.
(coro: Any)
| 2720 | |
| 2721 | |
| 2722 | def _run_coro_sync(coro: Any) -> Any: |
| 2723 | """Run async code from sync callers (CLI and public API). |
| 2724 | |
| 2725 | If already inside an event loop, execute in a helper thread to avoid |
| 2726 | `RuntimeError: asyncio.run() cannot be called from a running event loop`. |
| 2727 | """ |
| 2728 | try: |
| 2729 | asyncio.get_running_loop() |
| 2730 | except RuntimeError: |
| 2731 | return asyncio.run(coro) |
| 2732 | |
| 2733 | outcome: Dict[str, Any] = {} |
| 2734 | |
| 2735 | def target() -> None: |
| 2736 | try: |
| 2737 | outcome["result"] = asyncio.run(coro) |
| 2738 | except Exception as exc: # pragma: no cover - fallback path |
| 2739 | outcome["error"] = exc |
| 2740 | |
| 2741 | thread = threading.Thread(target=target, daemon=True) |
| 2742 | thread.start() |
| 2743 | thread.join() |
| 2744 | |
| 2745 | if "error" in outcome: |
| 2746 | raise outcome["error"] |
| 2747 | return outcome.get("result") |
| 2748 | |
| 2749 | |
| 2750 | def KNOCKPY( |
no test coverage detected