(
key,
issuer,
audience,
subject=None,
issued_at=None,
expires_at=None,
claims=None,
header=None,
**kwargs,
)
| 7 | |
| 8 | |
| 9 | def sign_jwt_bearer_assertion( |
| 10 | key, |
| 11 | issuer, |
| 12 | audience, |
| 13 | subject=None, |
| 14 | issued_at=None, |
| 15 | expires_at=None, |
| 16 | claims=None, |
| 17 | header=None, |
| 18 | **kwargs, |
| 19 | ): |
| 20 | if header is None: |
| 21 | header = {} |
| 22 | alg = kwargs.pop("alg", None) |
| 23 | if alg: |
| 24 | header["alg"] = alg |
| 25 | if "alg" not in header: |
| 26 | raise ValueError("Missing 'alg' in header") |
| 27 | |
| 28 | payload = {"iss": issuer, "aud": audience} |
| 29 | |
| 30 | # subject is not required in Google service |
| 31 | if subject: |
| 32 | payload["sub"] = subject |
| 33 | |
| 34 | if not issued_at: |
| 35 | issued_at = int(time.time()) |
| 36 | |
| 37 | expires_in = kwargs.pop("expires_in", 3600) |
| 38 | if expires_at is None: |
| 39 | expires_at = issued_at + expires_in |
| 40 | |
| 41 | payload["iat"] = issued_at |
| 42 | payload["exp"] = expires_at |
| 43 | |
| 44 | if claims: |
| 45 | payload.update(claims) |
| 46 | |
| 47 | return jwt.encode(header, payload, import_any_key(key), algorithms=[header["alg"]]) |
| 48 | |
| 49 | |
| 50 | def client_secret_jwt_sign( |
no test coverage detected
searching dependent graphs…