Extract JWE Compact Serialization. :param s: JWE Compact Serialization as bytes :param key: Private key used to decrypt payload (optionally can be a tuple of kid and essentially key) :param decode: Function to decode payload data :param sender_key: Sender
(self, s, key, decode=None, sender_key=None)
| 445 | return self.serialize_compact(header, payload, key, sender_key) |
| 446 | |
| 447 | def deserialize_compact(self, s, key, decode=None, sender_key=None): |
| 448 | """Extract JWE Compact Serialization. |
| 449 | |
| 450 | :param s: JWE Compact Serialization as bytes |
| 451 | :param key: Private key used to decrypt payload |
| 452 | (optionally can be a tuple of kid and essentially key) |
| 453 | :param decode: Function to decode payload data |
| 454 | :param sender_key: Sender's public key in case |
| 455 | JWEAlgorithmWithTagAwareKeyAgreement is used |
| 456 | :return: dict with `header` and `payload` keys where `header` value is |
| 457 | a dict containing protected header fields |
| 458 | """ |
| 459 | try: |
| 460 | s = to_bytes(s) |
| 461 | protected_s, ek_s, iv_s, ciphertext_s, tag_s = s.rsplit(b".") |
| 462 | except ValueError as exc: |
| 463 | raise DecodeError("Not enough segments") from exc |
| 464 | |
| 465 | protected = extract_header(protected_s, DecodeError) |
| 466 | ek = extract_segment(ek_s, DecodeError, "encryption key") |
| 467 | iv = extract_segment(iv_s, DecodeError, "initialization vector") |
| 468 | ciphertext = extract_segment(ciphertext_s, DecodeError, "ciphertext") |
| 469 | tag = extract_segment(tag_s, DecodeError, "authentication tag") |
| 470 | |
| 471 | alg = self.get_header_alg(protected) |
| 472 | enc = self.get_header_enc(protected) |
| 473 | zip_alg = self.get_header_zip(protected) |
| 474 | |
| 475 | self._validate_sender_key(sender_key, alg) |
| 476 | self._validate_private_headers(protected, alg) |
| 477 | |
| 478 | if isinstance(key, tuple) and len(key) == 2: |
| 479 | # Ignore separately provided kid, extract essentially key only |
| 480 | key = key[1] |
| 481 | |
| 482 | key = prepare_key(alg, protected, key) |
| 483 | |
| 484 | if sender_key is not None: |
| 485 | sender_key = alg.prepare_key(sender_key) |
| 486 | |
| 487 | if isinstance(alg, JWEAlgorithmWithTagAwareKeyAgreement): |
| 488 | # For a JWE algorithm with tag-aware key agreement: |
| 489 | if alg.key_size is not None: |
| 490 | # In case key agreement with key wrapping mode is used: |
| 491 | # Provide authentication tag to .unwrap method |
| 492 | cek = alg.unwrap(enc, ek, protected, key, sender_key, tag) |
| 493 | else: |
| 494 | # Otherwise, don't provide authentication tag to .unwrap method |
| 495 | cek = alg.unwrap(enc, ek, protected, key, sender_key) |
| 496 | else: |
| 497 | # For any other JWE algorithm: |
| 498 | # Don't provide authentication tag to .unwrap method |
| 499 | cek = alg.unwrap(enc, ek, protected, key) |
| 500 | |
| 501 | aad = to_bytes(protected_s, "ascii") |
| 502 | msg = enc.decrypt(ciphertext, aad, iv, tag, cek) |
| 503 | |
| 504 | if zip_alg: |