MCPcopy Index your code
hub / github.com/MagicStack/asyncpg / test_pool_acquire_timeout

Method test_pool_acquire_timeout

tests/test_adversity.py:30–44  ·  view source on GitHub ↗
(self)

Source from the content-addressed store, hash-verified

28
29 @tb.with_timeout(30.0)
30 async def test_pool_acquire_timeout(self):
31 pool = await self.create_pool(
32 database='postgres', min_size=2, max_size=2)
33 try:
34 self.proxy.trigger_connectivity_loss()
35 for _ in range(2):
36 with self.assertRaises(asyncio.TimeoutError):
37 async with pool.acquire(timeout=0.5):
38 pass
39 self.proxy.restore_connectivity()
40 async with pool.acquire(timeout=0.5):
41 pass
42 finally:
43 self.proxy.restore_connectivity()
44 pool.terminate()
45
46 @tb.with_timeout(30.0)
47 async def test_pool_release_timeout(self):

Callers

nothing calls this directly

Calls 5

restore_connectivityMethod · 0.80
create_poolMethod · 0.45
acquireMethod · 0.45
terminateMethod · 0.45

Tested by

no test coverage detected