MCPcopy
hub / github.com/ag2ai/faststream / test_wait_correct

Function test_wait_correct

tests/utils/test_handler_lock.py:30–50  ·  view source on GitHub ↗
()

Source from the content-addressed store, hash-verified

28
29@pytest.mark.asyncio()
30async def test_wait_correct() -> None:
31 lock = MultiLock()
32
33 async def func() -> None:
34 with lock:
35 await asyncio.sleep(0.01)
36
37 async def check(task_status: TaskStatus) -> None:
38 task_status.started()
39
40 assert not lock.empty
41 assert lock.qsize == 1
42
43 await lock.wait_release(5)
44
45 assert lock.empty
46 assert lock.qsize == 0
47
48 async with anyio.create_task_group() as tg:
49 tg.start_soon(func)
50 await tg.start(check)
51
52
53@pytest.mark.asyncio()

Callers

nothing calls this directly

Calls 2

MultiLockClass · 0.90
startMethod · 0.45

Tested by

no test coverage detected