MCPcopy
hub / github.com/Textualize/textual / test_multiple_tasks

Function test_multiple_tasks

tests/test_rlock.py:33–56  ·  view source on GitHub ↗

Check RLock prevents other tasks from acquiring lock.

()

Source from the content-addressed store, hash-verified

31
32
33async def test_multiple_tasks() -> None:
34 """Check RLock prevents other tasks from acquiring lock."""
35 lock = RLock()
36
37 started: list[int] = []
38 done: list[int] = []
39
40 async def test_task(n: int) -> None:
41 started.append(n)
42 async with lock:
43 done.append(n)
44
45 async with lock:
46 assert done == []
47 task1 = asyncio.create_task(test_task(1))
48 assert sorted(started) == []
49 task2 = asyncio.create_task(test_task(2))
50 await asyncio.sleep(0)
51 assert sorted(started) == [1, 2]
52
53 await task1
54 assert 1 in done
55 await task2
56 assert 2 in done

Callers

nothing calls this directly

Calls 3

RLockClass · 0.90
test_taskFunction · 0.85
sleepMethod · 0.80

Tested by

no test coverage detected

Used in the wild real call sites across dependent graphs

searching dependent graphs…