MCPcopy
hub / github.com/volcengine/OpenViking / _shutdown_loop

Function _shutdown_loop

openviking_cli/utils/async_utils.py:34–42  ·  view source on GitHub ↗

Shutdown the shared loop on process exit.

()

Source from the content-addressed store, hash-verified

32
33
34def _shutdown_loop():
35 """Shutdown the shared loop on process exit."""
36 global _loop, _loop_thread
37 if _loop is not None and not _loop.is_closed() and _loop_thread is not None:
38 _loop.call_soon_threadsafe(_loop.stop)
39 _loop_thread.join(timeout=5)
40 _loop.close()
41 _loop = None
42 _loop_thread = None
43
44
45def run_async(coro: Coroutine[None, None, T]) -> T:

Callers

nothing calls this directly

Calls 2

joinMethod · 0.45
closeMethod · 0.45

Tested by

no test coverage detected