MCPcopy
hub / github.com/InternLM/lmdeploy / event_loop

Function event_loop

tests/pytorch/engine/test_engine_sleep.py:72–91  ·  view source on GitHub ↗
()

Source from the content-addressed store, hash-verified

70
71@pytest.fixture
72def event_loop():
73 try:
74 old_loop = asyncio.get_event_loop()
75 except RuntimeError:
76 old_loop = None
77 new_loop = asyncio.new_event_loop()
78 try:
79 asyncio.set_event_loop(new_loop)
80 yield new_loop
81 finally:
82 pending = asyncio.all_tasks(new_loop)
83 for task in pending:
84 task.cancel()
85 if pending:
86 new_loop.run_until_complete(
87 asyncio.gather(*pending, return_exceptions=True))
88 new_loop.run_until_complete(new_loop.shutdown_asyncgens())
89 new_loop.stop()
90 new_loop.close()
91 asyncio.set_event_loop(old_loop)
92
93
94def _build_sleeping_test_engine(event_loop):

Callers

nothing calls this directly

Calls 4

cancelMethod · 0.45
run_until_completeMethod · 0.45
stopMethod · 0.45
closeMethod · 0.45

Tested by

no test coverage detected