MCPcopy
hub / github.com/AstrBotDevs/AstrBot / acquire_in_loop

Method acquire_in_loop

tests/unit/test_session_lock.py:92–97  ·  view source on GitHub ↗

Acquire lock in a new event loop.

(loop_id: int)

Source from the content-addressed store, hash-verified

90 results = []
91
92 async def acquire_in_loop(loop_id: int):
93 """Acquire lock in a new event loop."""
94 async with manager.acquire_lock(session_id):
95 results.append(f"loop-{loop_id}-acquired")
96 await asyncio.sleep(0.05)
97 results.append(f"loop-{loop_id}-released")
98
99 def run_in_thread(loop_id: int):
100 loop = asyncio.new_event_loop()

Callers

nothing calls this directly

Calls 2

acquire_lockMethod · 0.45
appendMethod · 0.45

Tested by

no test coverage detected