注册全局鉴权钩子。
(app)
| 84 | |
| 85 | |
| 86 | def init_auth(app): |
| 87 | """注册全局鉴权钩子。""" |
| 88 | |
| 89 | @app.before_request |
| 90 | def _authenticate(): # noqa: ANN202 |
| 91 | if request.method == "OPTIONS": |
| 92 | return None # CORS 预检放行 |
| 93 | path = request.path |
| 94 | if is_public_path(path) or not path.startswith("/api/"): |
| 95 | return None |
| 96 | |
| 97 | configured = configured_api_key() |
| 98 | |
| 99 | # 1) 显式提供了 Bearer 凭证:必须有效(JWT 或承载的 API Key),否则一律 401(含篡改/过期) |
| 100 | auth = request.headers.get("Authorization", "") |
| 101 | if auth.startswith("Bearer "): |
| 102 | token = auth[len("Bearer "):].strip() |
| 103 | try: |
| 104 | claims = decode_token(token) |
| 105 | if claims.get("type") == "access": |
| 106 | g.identity = { |
| 107 | "auth": "jwt", |
| 108 | "sub": claims.get("sub"), |
| 109 | "role": claims.get("role", "viewer"), |
| 110 | } |
| 111 | return None |
| 112 | except jwt.PyJWTError: |
| 113 | pass |
| 114 | if configured and token == configured: |
| 115 | g.identity = {"auth": "api_key", "role": "admin"} |
| 116 | return None |
| 117 | # Bearer 提供了但无效,不再降级开放 |
| 118 | return error_response("UNAUTHORIZED") |
| 119 | |
| 120 | # 2) X-API-Key / 开放模式 |
| 121 | if not configured: |
| 122 | g.identity = {"auth": "open", "role": "admin"} |
| 123 | return None |
| 124 | |
| 125 | provided = request.headers.get("X-API-Key") |
| 126 | if not provided or provided != configured: |
| 127 | return error_response("UNAUTHORIZED") |
| 128 | |
| 129 | g.identity = {"auth": "api_key", "role": "admin"} |
| 130 | return None |
| 131 | |
| 132 | |
| 133 | def require_auth(fn): |