(self)
| 338 | self.assertEqual(await self.con.fetchval('SELECT 1'), 1) |
| 339 | |
| 340 | async def test_copy_from_query_timeout_2(self): |
| 341 | async def writer(data): |
| 342 | try: |
| 343 | await asyncio.sleep(10) |
| 344 | except asyncio.TimeoutError: |
| 345 | raise |
| 346 | else: |
| 347 | self.fail('TimeoutError not raised') |
| 348 | |
| 349 | coro = self.con.copy_from_query(''' |
| 350 | SELECT |
| 351 | repeat('a', 500) |
| 352 | FROM |
| 353 | generate_series(1, 5000) AS i |
| 354 | ''', output=writer, timeout=0.10) |
| 355 | |
| 356 | task = self.loop.create_task(coro) |
| 357 | |
| 358 | with self.assertRaises(asyncio.TimeoutError): |
| 359 | await task |
| 360 | |
| 361 | self.assertEqual(await self.con.fetchval('SELECT 1'), 1) |
| 362 | |
| 363 | |
| 364 | class TestCopyTo(tb.ConnectedTestCase): |
nothing calls this directly
no test coverage detected