Return the cached auth check for this (instance, key) pair, or None on miss.
(instance_url: str, api_key: str)
| 43 | |
| 44 | |
| 45 | def load(instance_url: str, api_key: str) -> Optional[CachedAuthCheck]: |
| 46 | """Return the cached auth check for this (instance, key) pair, or None on miss.""" |
| 47 | path = _cache_file() |
| 48 | try: |
| 49 | with path.open("r") as f: |
| 50 | data = yaml.safe_load(f) |
| 51 | except FileNotFoundError: |
| 52 | return None |
| 53 | except (OSError, yaml.YAMLError) as e: |
| 54 | logger.warning("Could not load auth check cache: %s", repr(e)) |
| 55 | return None |
| 56 | |
| 57 | if not isinstance(data, dict): |
| 58 | return None |
| 59 | if data.get("key_hash") != _key_hash(instance_url, api_key): |
| 60 | return None |
| 61 | if data.get("expires_at", 0) < time.time(): |
| 62 | return None |
| 63 | |
| 64 | raw_scopes = data.get("scopes") |
| 65 | scopes: Optional[set[TokenScope]] |
| 66 | if raw_scopes is None: |
| 67 | scopes = None |
| 68 | else: |
| 69 | scopes = set() |
| 70 | for scope_str in raw_scopes: |
| 71 | try: |
| 72 | scopes.add(TokenScope(scope_str)) |
| 73 | except ValueError: |
| 74 | logger.debug("Ignoring unknown cached scope: '%s'", scope_str) |
| 75 | |
| 76 | raw_version = data.get("secrets_engine_version") |
| 77 | secrets_engine_version = raw_version if isinstance(raw_version, str) else None |
| 78 | |
| 79 | raw_max_payload = data.get("maximum_payload_size") |
| 80 | maximum_payload_size = raw_max_payload if isinstance(raw_max_payload, int) else None |
| 81 | |
| 82 | raw_ssp = data.get("secret_scan_preferences") |
| 83 | secret_scan_preferences: Optional[SecretScanPreferences] = None |
| 84 | if isinstance(raw_ssp, dict): |
| 85 | try: |
| 86 | secret_scan_preferences = SecretScanPreferences(**raw_ssp) |
| 87 | except TypeError as e: |
| 88 | logger.debug("Ignoring malformed cached secret_scan_preferences: %s", e) |
| 89 | |
| 90 | raw_rm = data.get("remediation_messages") |
| 91 | remediation_messages: Optional[RemediationMessages] = None |
| 92 | if isinstance(raw_rm, dict): |
| 93 | try: |
| 94 | remediation_messages = RemediationMessages(**raw_rm) |
| 95 | except TypeError as e: |
| 96 | logger.debug("Ignoring malformed cached remediation_messages: %s", e) |
| 97 | |
| 98 | return CachedAuthCheck( |
| 99 | scopes=scopes, |
| 100 | secrets_engine_version=secrets_engine_version, |
| 101 | maximum_payload_size=maximum_payload_size, |
| 102 | secret_scan_preferences=secret_scan_preferences, |
nothing calls this directly
no test coverage detected