MCPcopy
hub / github.com/python-trio/trio / test_Semaphore

Function test_Semaphore

src/trio/_tests/test_sync.py:267–312  ·  view source on GitHub ↗
()

Source from the content-addressed store, hash-verified

265
266
267async def test_Semaphore() -> None:
268 with pytest.raises(TypeError):
269 Semaphore(1.0) # type: ignore[arg-type]
270 with pytest.raises(ValueError, match=r"^initial value must be >= 0$"):
271 Semaphore(-1)
272 s = Semaphore(1)
273 repr(s) # smoke test
274 assert s.value == 1
275 assert s.max_value is None
276 s.release()
277 assert s.value == 2
278 assert s.statistics().tasks_waiting == 0
279 s.acquire_nowait()
280 assert s.value == 1
281 with assert_checkpoints():
282 await s.acquire()
283 assert s.value == 0
284 with pytest.raises(_core.WouldBlock):
285 s.acquire_nowait()
286
287 s.release()
288 assert s.value == 1
289 with assert_checkpoints():
290 async with s:
291 assert s.value == 0
292 assert s.value == 1
293 s.acquire_nowait()
294
295 record = []
296
297 async def do_acquire(s: Semaphore) -> None:
298 record.append("started")
299 await s.acquire()
300 record.append("finished")
301
302 async with _core.open_nursery() as nursery:
303 nursery.start_soon(do_acquire, s)
304 await wait_all_tasks_blocked()
305 assert record == ["started"]
306 assert s.value == 0
307 s.release()
308 # Fairness:
309 assert s.value == 0
310 with pytest.raises(_core.WouldBlock):
311 s.acquire_nowait()
312 assert record == ["started", "finished"]
313
314
315def test_Semaphore_bounded() -> None:

Callers

nothing calls this directly

Calls 8

releaseMethod · 0.95
statisticsMethod · 0.95
acquire_nowaitMethod · 0.95
acquireMethod · 0.95
SemaphoreClass · 0.85
assert_checkpointsFunction · 0.85
wait_all_tasks_blockedFunction · 0.85
start_soonMethod · 0.80

Tested by

no test coverage detected

Used in the wild real call sites across dependent graphs

searching dependent graphs…