MCPcopy
hub / github.com/MagicStack/asyncpg / test_pool_04

Method test_pool_04

tests/test_pool.py:86–107  ·  view source on GitHub ↗
(self)

Source from the content-addressed store, hash-verified

84 del con
85
86 async def test_pool_04(self):
87 pool = await self.create_pool(database='postgres',
88 min_size=1, max_size=1)
89
90 con = await pool.acquire(timeout=POOL_NOMINAL_TIMEOUT)
91
92 # Manual termination of pool connections releases the
93 # pool item immediately.
94 con.terminate()
95 self.assertIsNone(pool._holders[0]._con)
96 self.assertIsNone(pool._holders[0]._in_use)
97
98 con = await pool.acquire(timeout=POOL_NOMINAL_TIMEOUT)
99 self.assertEqual(await con.fetchval('SELECT 1'), 1)
100
101 await con.close()
102 self.assertIsNone(pool._holders[0]._con)
103 self.assertIsNone(pool._holders[0]._in_use)
104 # Calling release should not hurt.
105 await pool.release(con)
106
107 pool.terminate()
108
109 async def test_pool_05(self):
110 for n in {1, 3, 5, 10, 20, 100}:

Callers

nothing calls this directly

Calls 6

create_poolMethod · 0.45
acquireMethod · 0.45
terminateMethod · 0.45
fetchvalMethod · 0.45
closeMethod · 0.45
releaseMethod · 0.45

Tested by

no test coverage detected