(self)
| 320 | self.assertEqual(await self.con.fetchval('SELECT 1'), 1) |
| 321 | |
| 322 | async def test_copy_from_query_timeout_1(self): |
| 323 | async def writer(data): |
| 324 | await asyncio.sleep(0.05) |
| 325 | |
| 326 | coro = self.con.copy_from_query(''' |
| 327 | SELECT |
| 328 | repeat('a', 500) |
| 329 | FROM |
| 330 | generate_series(1, 5000) AS i |
| 331 | ''', output=writer, timeout=0.10) |
| 332 | |
| 333 | task = self.loop.create_task(coro) |
| 334 | |
| 335 | with self.assertRaises(asyncio.TimeoutError): |
| 336 | await task |
| 337 | |
| 338 | self.assertEqual(await self.con.fetchval('SELECT 1'), 1) |
| 339 | |
| 340 | async def test_copy_from_query_timeout_2(self): |
| 341 | async def writer(data): |
nothing calls this directly
no test coverage detected