MCPcopy
hub / github.com/MagicStack/asyncpg / test_copy_from_query_timeout_1

Method test_copy_from_query_timeout_1

tests/test_copy.py:322–338  ·  view source on GitHub ↗
(self)

Source from the content-addressed store, hash-verified

320 self.assertEqual(await self.con.fetchval('SELECT 1'), 1)
321
322 async def test_copy_from_query_timeout_1(self):
323 async def writer(data):
324 await asyncio.sleep(0.05)
325
326 coro = self.con.copy_from_query('''
327 SELECT
328 repeat('a', 500)
329 FROM
330 generate_series(1, 5000) AS i
331 ''', output=writer, timeout=0.10)
332
333 task = self.loop.create_task(coro)
334
335 with self.assertRaises(asyncio.TimeoutError):
336 await task
337
338 self.assertEqual(await self.con.fetchval('SELECT 1'), 1)
339
340 async def test_copy_from_query_timeout_2(self):
341 async def writer(data):

Callers

nothing calls this directly

Calls 2

copy_from_queryMethod · 0.45
fetchvalMethod · 0.45

Tested by

no test coverage detected