MCPcopy Index your code
hub / github.com/MagicStack/asyncpg / test_pool_release_timeout

Method test_pool_release_timeout

tests/test_adversity.py:47–56  ·  view source on GitHub ↗
(self)

Source from the content-addressed store, hash-verified

45
46 @tb.with_timeout(30.0)
47 async def test_pool_release_timeout(self):
48 pool = await self.create_pool(
49 database='postgres', min_size=2, max_size=2)
50 try:
51 with self.assertRaises(asyncio.TimeoutError):
52 async with pool.acquire(timeout=0.5):
53 self.proxy.trigger_connectivity_loss()
54 finally:
55 self.proxy.restore_connectivity()
56 pool.terminate()
57
58 @tb.with_timeout(30.0)
59 async def test_pool_handles_abrupt_connection_loss(self):

Callers

nothing calls this directly

Calls 5

restore_connectivityMethod · 0.80
create_poolMethod · 0.45
acquireMethod · 0.45
terminateMethod · 0.45

Tested by

no test coverage detected