(self)
| 262 | self.assertEqual(result, []) |
| 263 | |
| 264 | async def test_executemany_timeout(self): |
| 265 | with self.assertRaises(asyncio.TimeoutError): |
| 266 | await self.con.executemany(''' |
| 267 | INSERT INTO exmany VALUES(pg_sleep(0.1) || $1, $2) |
| 268 | ''', [('a' * 32768, x) for x in range(128)], timeout=0.5) |
| 269 | result = await self.con.fetch('SELECT * FROM exmany') |
| 270 | self.assertEqual(result, []) |
| 271 | |
| 272 | async def test_executemany_timeout_flow_control(self): |
| 273 | event = asyncio.Event() |
nothing calls this directly
no test coverage detected