(self)
| 59 | await pool.close() |
| 60 | |
| 61 | async def test_pool_02(self): |
| 62 | for n in {1, 3, 5, 10, 20, 100}: |
| 63 | with self.subTest(tasksnum=n): |
| 64 | async with self.create_pool(database='postgres', |
| 65 | min_size=5, max_size=5) as pool: |
| 66 | |
| 67 | async def worker(): |
| 68 | con = await pool.acquire(timeout=5) |
| 69 | self.assertEqual(await con.fetchval('SELECT 1'), 1) |
| 70 | await pool.release(con) |
| 71 | |
| 72 | tasks = [worker() for _ in range(n)] |
| 73 | await asyncio.gather(*tasks) |
| 74 | |
| 75 | async def test_pool_03(self): |
| 76 | pool = await self.create_pool(database='postgres', |
nothing calls this directly
no test coverage detected