MCPcopy Index your code
hub / github.com/MagicStack/asyncpg / test_executemany_timeout

Method test_executemany_timeout

tests/test_execute.py:264–270  ·  view source on GitHub ↗
(self)

Source from the content-addressed store, hash-verified

262 self.assertEqual(result, [])
263
264 async def test_executemany_timeout(self):
265 with self.assertRaises(asyncio.TimeoutError):
266 await self.con.executemany('''
267 INSERT INTO exmany VALUES(pg_sleep(0.1) || $1, $2)
268 ''', [('a' * 32768, x) for x in range(128)], timeout=0.5)
269 result = await self.con.fetch('SELECT * FROM exmany')
270 self.assertEqual(result, [])
271
272 async def test_executemany_timeout_flow_control(self):
273 event = asyncio.Event()

Callers

nothing calls this directly

Calls 2

executemanyMethod · 0.45
fetchMethod · 0.45

Tested by

no test coverage detected