MCPcopy
hub / github.com/MagicStack/asyncpg / test_copy_from_query_basics

Method test_copy_from_query_basics

tests/test_copy.py:120–144  ·  view source on GitHub ↗
(self)

Source from the content-addressed store, hash-verified

118 await self.con.execute('DROP TABLE public.copytab')
119
120 async def test_copy_from_query_basics(self):
121 f = io.BytesIO()
122
123 res = await self.con.copy_from_query('''
124 SELECT
125 repeat('a' || i::text, 500000),
126 repeat('b' || i::text, 500000)
127 FROM
128 generate_series(1, 5) AS i
129 ''', output=f)
130
131 self.assertEqual(res, 'COPY 5')
132
133 output = f.getvalue().decode().split('\n')
134 self.assertEqual(
135 output,
136 [
137 'a1' * 500000 + '\t' + 'b1' * 500000,
138 'a2' * 500000 + '\t' + 'b2' * 500000,
139 'a3' * 500000 + '\t' + 'b3' * 500000,
140 'a4' * 500000 + '\t' + 'b4' * 500000,
141 'a5' * 500000 + '\t' + 'b5' * 500000,
142 ''
143 ]
144 )
145
146 async def test_copy_from_query_with_args(self):
147 f = io.BytesIO()

Callers

nothing calls this directly

Calls 1

copy_from_queryMethod · 0.45

Tested by

no test coverage detected