| 35 | return self.key.id_hash(data) |
| 36 | |
| 37 | def format( |
| 38 | self, |
| 39 | id: bytes, |
| 40 | meta: dict, |
| 41 | data: bytes, |
| 42 | compress: bool = True, |
| 43 | size: int = None, |
| 44 | ctype: int = None, |
| 45 | clevel: int = None, |
| 46 | ro_type: str = None, |
| 47 | ) -> bytes: |
| 48 | assert isinstance(ro_type, str) |
| 49 | assert ro_type != ROBJ_DONTCARE |
| 50 | meta["type"] = ro_type |
| 51 | assert isinstance(id, bytes) |
| 52 | assert isinstance(meta, dict) |
| 53 | assert isinstance(data, (bytes, memoryview)) |
| 54 | assert compress or size is not None and ctype is not None and clevel is not None |
| 55 | if compress: |
| 56 | assert size is None or size == len(data) |
| 57 | meta, data_compressed = self.compressor.compress(meta, data) |
| 58 | else: |
| 59 | assert isinstance(size, int) |
| 60 | meta["size"] = size |
| 61 | assert isinstance(ctype, int) |
| 62 | meta["ctype"] = ctype |
| 63 | assert isinstance(clevel, int) |
| 64 | meta["clevel"] = clevel |
| 65 | data_compressed = data # is already compressed, is NOT prefixed by type/level bytes |
| 66 | meta["csize"] = len(data_compressed) |
| 67 | data_encrypted = self.key.encrypt(id, data_compressed) |
| 68 | meta_packed = msgpack.packb(meta) |
| 69 | meta_encrypted = self.key.encrypt(id, meta_packed) |
| 70 | hdr = self.ObjHeader( |
| 71 | len(meta_encrypted), len(data_encrypted), xxh64(meta_encrypted).digest(), xxh64(data_encrypted).digest() |
| 72 | ) |
| 73 | hdr_packed = self.obj_header.pack(*hdr) |
| 74 | return hdr_packed + meta_encrypted + data_encrypted |
| 75 | |
| 76 | def parse_meta(self, id: bytes, cdata: bytes, ro_type: str) -> dict: |
| 77 | # when calling parse_meta, enough cdata needs to be supplied to contain completely the |