MCPcopy
hub / github.com/ag2ai/faststream / test_nowait_correct

Function test_nowait_correct

tests/utils/test_handler_lock.py:54–74  ·  view source on GitHub ↗
()

Source from the content-addressed store, hash-verified

52
53@pytest.mark.asyncio()
54async def test_nowait_correct() -> None:
55 lock = MultiLock()
56
57 async def func() -> None:
58 with lock:
59 await asyncio.sleep(0.01)
60
61 async def check(task_status: TaskStatus) -> None:
62 task_status.started()
63
64 assert not lock.empty
65 assert lock.qsize == 1
66
67 await lock.wait_release()
68
69 assert not lock.empty
70 assert lock.qsize == 1
71
72 async with anyio.create_task_group() as tg:
73 tg.start_soon(func)
74 await tg.start(check)

Callers

nothing calls this directly

Calls 2

MultiLockClass · 0.90
startMethod · 0.45

Tested by

no test coverage detected