MCPcopy
hub / github.com/MagicStack/asyncpg / test_copy_from_query_to_sink

Method test_copy_from_query_to_sink

tests/test_copy.py:234–259  ·  view source on GitHub ↗
(self)

Source from the content-addressed store, hash-verified

232 ''', 3, output=1)
233
234 async def test_copy_from_query_to_sink(self):
235 with tempfile.NamedTemporaryFile() as f:
236 async def writer(data):
237 # Sleeping here to simulate slow output sink to test
238 # backpressure.
239 await asyncio.sleep(0.05)
240 f.write(data)
241
242 await self.con.copy_from_query('''
243 SELECT
244 repeat('a', 500)
245 FROM
246 generate_series(1, 5000) AS i
247 ''', output=writer)
248
249 f.seek(0)
250
251 output = f.read().decode().split('\n')
252 self.assertEqual(
253 output,
254 [
255 'a' * 500
256 ] * 5000 + ['']
257 )
258
259 self.assertEqual(await self.con.fetchval('SELECT 1'), 1)
260
261 async def test_copy_from_query_cancellation_explicit(self):
262 async def writer(data):

Callers

nothing calls this directly

Calls 2

copy_from_queryMethod · 0.45
fetchvalMethod · 0.45

Tested by

no test coverage detected