MCPcopy
hub / github.com/guelfoweb/knockpy / _run_coro_sync

Function _run_coro_sync

knockpy/engine/runtime.py:2722–2747  ·  view source on GitHub ↗

Run async code from sync callers (CLI and public API). If already inside an event loop, execute in a helper thread to avoid `RuntimeError: asyncio.run() cannot be called from a running event loop`.

(coro: Any)

Source from the content-addressed store, hash-verified

2720
2721
2722def _run_coro_sync(coro: Any) -> Any:
2723 """Run async code from sync callers (CLI and public API).
2724
2725 If already inside an event loop, execute in a helper thread to avoid
2726 `RuntimeError: asyncio.run() cannot be called from a running event loop`.
2727 """
2728 try:
2729 asyncio.get_running_loop()
2730 except RuntimeError:
2731 return asyncio.run(coro)
2732
2733 outcome: Dict[str, Any] = {}
2734
2735 def target() -> None:
2736 try:
2737 outcome["result"] = asyncio.run(coro)
2738 except Exception as exc: # pragma: no cover - fallback path
2739 outcome["error"] = exc
2740
2741 thread = threading.Thread(target=target, daemon=True)
2742 thread.start()
2743 thread.join()
2744
2745 if "error" in outcome:
2746 raise outcome["error"]
2747 return outcome.get("result")
2748
2749
2750def KNOCKPY(

Callers 4

_run_with_rich_progressFunction · 0.85
mainFunction · 0.85
run_recon_testFunction · 0.85
KNOCKPYFunction · 0.85

Calls 1

startMethod · 0.45

Tested by

no test coverage detected