(self)
| 28 | |
| 29 | @tb.with_timeout(30.0) |
| 30 | async def test_pool_acquire_timeout(self): |
| 31 | pool = await self.create_pool( |
| 32 | database='postgres', min_size=2, max_size=2) |
| 33 | try: |
| 34 | self.proxy.trigger_connectivity_loss() |
| 35 | for _ in range(2): |
| 36 | with self.assertRaises(asyncio.TimeoutError): |
| 37 | async with pool.acquire(timeout=0.5): |
| 38 | pass |
| 39 | self.proxy.restore_connectivity() |
| 40 | async with pool.acquire(timeout=0.5): |
| 41 | pass |
| 42 | finally: |
| 43 | self.proxy.restore_connectivity() |
| 44 | pool.terminate() |
| 45 | |
| 46 | @tb.with_timeout(30.0) |
| 47 | async def test_pool_release_timeout(self): |
nothing calls this directly
no test coverage detected