Signed serializer.
| 18 | |
| 19 | |
| 20 | class SecureSerializer: |
| 21 | """Signed serializer.""" |
| 22 | |
| 23 | def __init__(self, key=None, cert=None, cert_store=None, |
| 24 | digest=DEFAULT_SECURITY_DIGEST, serializer='json'): |
| 25 | self._key = key |
| 26 | self._cert = cert |
| 27 | self._cert_store = cert_store |
| 28 | self._digest = get_digest_algorithm(digest) |
| 29 | self._serializer = serializer |
| 30 | |
| 31 | def serialize(self, data): |
| 32 | """Serialize data structure into string.""" |
| 33 | assert self._key is not None |
| 34 | assert self._cert is not None |
| 35 | with reraise_errors('Unable to serialize: {0!r}', (Exception,)): |
| 36 | content_type, content_encoding, body = dumps( |
| 37 | data, serializer=self._serializer) |
| 38 | |
| 39 | # What we sign is the serialized body, not the body itself. |
| 40 | # this way the receiver doesn't have to decode the contents |
| 41 | # to verify the signature (and thus avoiding potential flaws |
| 42 | # in the decoding step). |
| 43 | body = ensure_bytes(body) |
| 44 | return self._pack(body, content_type, content_encoding, |
| 45 | signature=self._key.sign(body, self._digest), |
| 46 | signer=self._cert.get_id()) |
| 47 | |
| 48 | def deserialize(self, data): |
| 49 | """Deserialize data structure from string.""" |
| 50 | assert self._cert_store is not None |
| 51 | with reraise_errors('Unable to deserialize: {0!r}', (Exception,)): |
| 52 | payload = self._unpack(data) |
| 53 | signature, signer, body = (payload['signature'], |
| 54 | payload['signer'], |
| 55 | payload['body']) |
| 56 | self._cert_store[signer].verify(body, signature, self._digest) |
| 57 | return loads(body, payload['content_type'], |
| 58 | payload['content_encoding'], force=True) |
| 59 | |
| 60 | def _pack(self, body, content_type, content_encoding, signer, signature, |
| 61 | sep=DEFAULT_SEPARATOR): |
| 62 | fields = sep.join( |
| 63 | ensure_bytes(s) for s in [b64encode(signer), b64encode(signature), |
| 64 | content_type, content_encoding, body] |
| 65 | ) |
| 66 | return b64encode(fields) |
| 67 | |
| 68 | def _unpack(self, payload, sep=DEFAULT_SEPARATOR): |
| 69 | raw_payload = b64decode(ensure_bytes(payload)) |
| 70 | v = raw_payload.split(sep, maxsplit=4) |
| 71 | return { |
| 72 | 'signer': b64decode(v[0]), |
| 73 | 'signature': b64decode(v[1]), |
| 74 | 'content_type': bytes_to_str(v[2]), |
| 75 | 'content_encoding': bytes_to_str(v[3]), |
| 76 | 'body': v[4], |
| 77 | } |
no outgoing calls
searching dependent graphs…