(
request: Request,
raw_key: str,
scope: str,
)
| 108 | |
| 109 | |
| 110 | async def _require_api_key_scope( |
| 111 | request: Request, |
| 112 | raw_key: str, |
| 113 | scope: str, |
| 114 | ) -> AuthContext: |
| 115 | if scope not in ALL_OPEN_API_SCOPES: |
| 116 | raise ApiError("Insufficient API key scope", status_code=403) |
| 117 | |
| 118 | key_hash = ApiKeyService.hash_key(raw_key) |
| 119 | api_key = await request.app.state.db.get_active_api_key_by_hash(key_hash) |
| 120 | if not api_key: |
| 121 | raise ApiError("Invalid API key", status_code=401) |
| 122 | scopes = ( |
| 123 | [str(scope) for scope in api_key.scopes] |
| 124 | if isinstance(api_key.scopes, list) |
| 125 | else [str(scope) for scope in ALL_OPEN_API_SCOPES] |
| 126 | ) |
| 127 | if ( |
| 128 | "*" not in scopes |
| 129 | and scope not in scopes |
| 130 | and not any( |
| 131 | scope in OPEN_API_SCOPE_INCLUDES.get(api_key_scope, ()) |
| 132 | for api_key_scope in scopes |
| 133 | ) |
| 134 | ): |
| 135 | raise ApiError("Insufficient API key scope", status_code=403) |
| 136 | await request.app.state.db.touch_api_key(api_key.key_id) |
| 137 | return AuthContext( |
| 138 | username=f"api_key:{api_key.key_id}", |
| 139 | scopes=scopes, |
| 140 | api_key_id=api_key.key_id, |
| 141 | via="api_key", |
| 142 | ) |
| 143 | |
| 144 | |
| 145 | async def require_scope(request: Request, scope: str) -> AuthContext: |
no test coverage detected