MCPcopy
hub / github.com/MagicStack/asyncpg / test_custom_codec_binary

Method test_custom_codec_binary

tests/test_codecs.py:1240–1318  ·  view source on GitHub ↗

Test encoding/decoding using a custom codec in binary mode.

(self)

Source from the content-addressed store, hash-verified

1238 ''')
1239
1240 async def test_custom_codec_binary(self):
1241 """Test encoding/decoding using a custom codec in binary mode."""
1242 await self.con.execute('''
1243 CREATE EXTENSION IF NOT EXISTS hstore
1244 ''')
1245
1246 longstruct = struct.Struct('!L')
1247 ulong_unpack = lambda b: longstruct.unpack_from(b)[0]
1248 ulong_pack = longstruct.pack
1249
1250 def hstore_decoder(data):
1251 result = {}
1252 n = ulong_unpack(data)
1253 view = memoryview(data)
1254 ptr = 4
1255
1256 for i in range(n):
1257 klen = ulong_unpack(view[ptr:ptr + 4])
1258 ptr += 4
1259 k = bytes(view[ptr:ptr + klen]).decode()
1260 ptr += klen
1261 vlen = ulong_unpack(view[ptr:ptr + 4])
1262 ptr += 4
1263 if vlen == -1:
1264 v = None
1265 else:
1266 v = bytes(view[ptr:ptr + vlen]).decode()
1267 ptr += vlen
1268
1269 result[k] = v
1270
1271 return result
1272
1273 def hstore_encoder(obj):
1274 buffer = bytearray(ulong_pack(len(obj)))
1275
1276 for k, v in obj.items():
1277 kenc = k.encode()
1278 buffer += ulong_pack(len(kenc)) + kenc
1279
1280 if v is None:
1281 buffer += b'\xFF\xFF\xFF\xFF' # -1
1282 else:
1283 venc = v.encode()
1284 buffer += ulong_pack(len(venc)) + venc
1285
1286 return buffer
1287
1288 try:
1289 await self.con.set_type_codec('hstore', encoder=hstore_encoder,
1290 decoder=hstore_decoder,
1291 format='binary')
1292
1293 st = await self.con.prepare('''
1294 SELECT $1::hstore AS result
1295 ''')
1296
1297 res = await st.fetchrow({'ham': 'spam'})

Callers

nothing calls this directly

Calls 6

set_type_codecMethod · 0.80
prepareMethod · 0.80
get_parametersMethod · 0.80
get_attributesMethod · 0.80
executeMethod · 0.45
fetchrowMethod · 0.45

Tested by

no test coverage detected