| 30 | |
| 31 | |
| 32 | class JWTBearerTokenValidator(BearerTokenValidator): |
| 33 | TOKEN_TYPE = "bearer" |
| 34 | token_cls = JWTBearerToken |
| 35 | |
| 36 | def __init__(self, public_key, issuer=None, realm=None, **extra_attributes): |
| 37 | super().__init__(realm, **extra_attributes) |
| 38 | self.public_key = import_any_key(public_key) |
| 39 | claims_options = { |
| 40 | "exp": {"essential": True}, |
| 41 | "client_id": {"essential": True}, |
| 42 | "grant_type": {"essential": True}, |
| 43 | } |
| 44 | if issuer: |
| 45 | claims_options["iss"] = {"essential": True, "value": issuer} |
| 46 | self.claims_options = claims_options |
| 47 | |
| 48 | def authenticate_token(self, token_string: str): |
| 49 | try: |
| 50 | token = jwt.decode(token_string, self.public_key) |
| 51 | except JoseError as error: |
| 52 | logger.debug("Authenticate token failed. %r", error) |
| 53 | return None |
| 54 | |
| 55 | claims_requests = jwt.JWTClaimsRegistry(leeway=60, **self.claims_options) |
| 56 | try: |
| 57 | claims_requests.validate(token.claims) |
| 58 | except JoseError as error: |
| 59 | logger.debug("Authenticate token failed. %r", error) |
| 60 | return None |
| 61 | |
| 62 | return JWTBearerToken(token.claims) |
no outgoing calls
searching dependent graphs…