MCPcopy
hub / github.com/AstrBotDevs/AstrBot / acquire_and_capture

Method acquire_and_capture

tests/unit/test_session_lock.py:362–371  ·  view source on GitHub ↗
()

Source from the content-addressed store, hash-verified

360 try:
361
362 async def acquire_and_capture():
363 # Get the per-loop manager
364 per_loop_mgr = manager._get_loop_manager()
365 # Capture the lock object id before acquiring
366 async with per_loop_mgr._access_lock:
367 lock = per_loop_mgr._locks[session_id]
368 with lock_id_lock:
369 lock_ids.add(id(lock))
370 async with manager.acquire_lock(session_id):
371 await asyncio.sleep(0.01)
372
373 loop.run_until_complete(acquire_and_capture())
374 finally:

Callers

nothing calls this directly

Calls 3

_get_loop_managerMethod · 0.80
addMethod · 0.80
acquire_lockMethod · 0.45

Tested by

no test coverage detected