Acquire lock in a new event loop.
(loop_id: int)
| 90 | results = [] |
| 91 | |
| 92 | async def acquire_in_loop(loop_id: int): |
| 93 | """Acquire lock in a new event loop.""" |
| 94 | async with manager.acquire_lock(session_id): |
| 95 | results.append(f"loop-{loop_id}-acquired") |
| 96 | await asyncio.sleep(0.05) |
| 97 | results.append(f"loop-{loop_id}-released") |
| 98 | |
| 99 | def run_in_thread(loop_id: int): |
| 100 | loop = asyncio.new_event_loop() |
nothing calls this directly
no test coverage detected