Test encoding/decoding using a custom codec in binary mode.
(self)
| 1238 | ''') |
| 1239 | |
| 1240 | async def test_custom_codec_binary(self): |
| 1241 | """Test encoding/decoding using a custom codec in binary mode.""" |
| 1242 | await self.con.execute(''' |
| 1243 | CREATE EXTENSION IF NOT EXISTS hstore |
| 1244 | ''') |
| 1245 | |
| 1246 | longstruct = struct.Struct('!L') |
| 1247 | ulong_unpack = lambda b: longstruct.unpack_from(b)[0] |
| 1248 | ulong_pack = longstruct.pack |
| 1249 | |
| 1250 | def hstore_decoder(data): |
| 1251 | result = {} |
| 1252 | n = ulong_unpack(data) |
| 1253 | view = memoryview(data) |
| 1254 | ptr = 4 |
| 1255 | |
| 1256 | for i in range(n): |
| 1257 | klen = ulong_unpack(view[ptr:ptr + 4]) |
| 1258 | ptr += 4 |
| 1259 | k = bytes(view[ptr:ptr + klen]).decode() |
| 1260 | ptr += klen |
| 1261 | vlen = ulong_unpack(view[ptr:ptr + 4]) |
| 1262 | ptr += 4 |
| 1263 | if vlen == -1: |
| 1264 | v = None |
| 1265 | else: |
| 1266 | v = bytes(view[ptr:ptr + vlen]).decode() |
| 1267 | ptr += vlen |
| 1268 | |
| 1269 | result[k] = v |
| 1270 | |
| 1271 | return result |
| 1272 | |
| 1273 | def hstore_encoder(obj): |
| 1274 | buffer = bytearray(ulong_pack(len(obj))) |
| 1275 | |
| 1276 | for k, v in obj.items(): |
| 1277 | kenc = k.encode() |
| 1278 | buffer += ulong_pack(len(kenc)) + kenc |
| 1279 | |
| 1280 | if v is None: |
| 1281 | buffer += b'\xFF\xFF\xFF\xFF' # -1 |
| 1282 | else: |
| 1283 | venc = v.encode() |
| 1284 | buffer += ulong_pack(len(venc)) + venc |
| 1285 | |
| 1286 | return buffer |
| 1287 | |
| 1288 | try: |
| 1289 | await self.con.set_type_codec('hstore', encoder=hstore_encoder, |
| 1290 | decoder=hstore_decoder, |
| 1291 | format='binary') |
| 1292 | |
| 1293 | st = await self.con.prepare(''' |
| 1294 | SELECT $1::hstore AS result |
| 1295 | ''') |
| 1296 | |
| 1297 | res = await st.fetchrow({'ham': 'spam'}) |
nothing calls this directly
no test coverage detected