MCPcopy
hub / github.com/MagicStack/asyncpg / test_prepare_07_interrupted_terminate

Method test_prepare_07_interrupted_terminate

tests/test_prepare.py:103–118  ·  view source on GitHub ↗
(self)

Source from the content-addressed store, hash-verified

101 await self.con.close()
102
103 async def test_prepare_07_interrupted_terminate(self):
104 stmt = await self.con.prepare('''SELECT pg_sleep(10)''')
105 fut = self.loop.create_task(stmt.fetchval())
106
107 await asyncio.sleep(0.2)
108
109 self.assertFalse(self.con.is_closed())
110 self.con.terminate()
111 self.assertTrue(self.con.is_closed())
112
113 with self.assertRaisesRegex(asyncpg.ConnectionDoesNotExistError,
114 'closed in the middle'):
115 await fut
116
117 # Test that it's OK to call terminate again
118 self.con.terminate()
119
120 async def test_prepare_08_big_result(self):
121 stmt = await self.con.prepare('select generate_series(0,10000)')

Callers

nothing calls this directly

Calls 4

prepareMethod · 0.80
is_closedMethod · 0.80
fetchvalMethod · 0.45
terminateMethod · 0.45

Tested by

no test coverage detected