MCPcopy
hub / github.com/MagicStack/asyncpg / test_custom_codec_override_text

Method test_custom_codec_override_text

tests/test_codecs.py:1440–1485  ·  view source on GitHub ↗

Test overriding core codecs.

(self)

Source from the content-addressed store, hash-verified

1438 await conn.close()
1439
1440 async def test_custom_codec_override_text(self):
1441 """Test overriding core codecs."""
1442 import json
1443
1444 conn = await self.connect()
1445 try:
1446 def _encoder(value):
1447 return json.dumps(value)
1448
1449 def _decoder(value):
1450 return json.loads(value)
1451
1452 await conn.set_type_codec(
1453 'json', encoder=_encoder, decoder=_decoder,
1454 schema='pg_catalog', format='text'
1455 )
1456
1457 data = {'foo': 'bar', 'spam': 1}
1458 res = await conn.fetchval('SELECT $1::json', data)
1459 self.assertEqual(data, res)
1460
1461 res = await conn.fetchval('SELECT $1::json[]', [data])
1462 self.assertEqual([data], res)
1463
1464 await conn.execute('CREATE DOMAIN my_json AS json')
1465
1466 res = await conn.fetchval('SELECT $1::my_json', data)
1467 self.assertEqual(data, res)
1468
1469 def _encoder(value):
1470 return value
1471
1472 def _decoder(value):
1473 return value
1474
1475 await conn.set_type_codec(
1476 'uuid', encoder=_encoder, decoder=_decoder,
1477 schema='pg_catalog', format='text'
1478 )
1479
1480 data = '14058ad9-0118-4b7e-ac15-01bc13e2ccd1'
1481 res = await conn.fetchval('SELECT $1::uuid', data)
1482 self.assertEqual(res, data)
1483 finally:
1484 await conn.execute('DROP DOMAIN IF EXISTS my_json')
1485 await conn.close()
1486
1487 async def test_custom_codec_override_tuple(self):
1488 """Test overriding core codecs."""

Callers

nothing calls this directly

Calls 5

set_type_codecMethod · 0.80
connectMethod · 0.45
fetchvalMethod · 0.45
executeMethod · 0.45
closeMethod · 0.45

Tested by

no test coverage detected