MCPcopy
hub / github.com/MagicStack/asyncpg / test_pool_remote_close

Method test_pool_remote_close

tests/test_pool.py:982–1005  ·  view source on GitHub ↗
(self)

Source from the content-addressed store, hash-verified

980 await pool.close()
981
982 async def test_pool_remote_close(self):
983 pool = await self.create_pool(min_size=1, max_size=1)
984 backend_pid_fut = self.loop.create_future()
985
986 async def worker():
987 async with pool.acquire() as conn:
988 pool_backend_pid = await conn.fetchval(
989 'SELECT pg_backend_pid()')
990 backend_pid_fut.set_result(pool_backend_pid)
991 await asyncio.sleep(0.2)
992
993 task = self.loop.create_task(worker())
994 try:
995 conn = await self.connect()
996 backend_pid = await backend_pid_fut
997 await conn.execute('SELECT pg_terminate_backend($1)', backend_pid)
998 finally:
999 await conn.close()
1000
1001 await task
1002
1003 # Check that connection_lost has released the pool holder.
1004 conn = await pool.acquire(timeout=0.1)
1005 await pool.release(conn)
1006
1007
1008@unittest.skipIf(os.environ.get('PGHOST'), 'unmanaged cluster')

Callers

nothing calls this directly

Calls 6

connectMethod · 0.95
create_poolMethod · 0.45
executeMethod · 0.45
closeMethod · 0.45
acquireMethod · 0.45
releaseMethod · 0.45

Tested by

no test coverage detected