MCPcopy
hub / github.com/MagicStack/asyncpg / test_copy_to_table_large_rows

Method test_copy_to_table_large_rows

tests/test_copy.py:462–487  ·  view source on GitHub ↗
(self)

Source from the content-addressed store, hash-verified

460 await self.con.execute('DROP TABLE public.copytab')
461
462 async def test_copy_to_table_large_rows(self):
463 await self.con.execute('''
464 CREATE TABLE copytab(a text, b text);
465 ''')
466
467 try:
468 class _Source:
469 def __init__(self):
470 self.rowcount = 0
471
472 def __aiter__(self):
473 return self
474
475 async def __anext__(self):
476 if self.rowcount >= 100:
477 raise StopAsyncIteration
478 else:
479 self.rowcount += 1
480 return b'a1' * 500000 + b'\t' + b'b1' * 500000 + b'\n'
481
482 res = await self.con.copy_to_table('copytab', source=_Source())
483
484 self.assertEqual(res, 'COPY 100')
485
486 finally:
487 await self.con.execute('DROP TABLE copytab')
488
489 async def test_copy_to_table_from_bytes_like(self):
490 await self.con.execute('''

Callers

nothing calls this directly

Calls 3

_SourceClass · 0.85
executeMethod · 0.45
copy_to_tableMethod · 0.45

Tested by

no test coverage detected