(instance)
| 96 | |
| 97 | @staticmethod |
| 98 | def login(instance): |
| 99 | # 解密数据 |
| 100 | username = instance.get("username", "") |
| 101 | encrypted_data = instance.get("encryptedData", "") |
| 102 | |
| 103 | if encrypted_data: |
| 104 | try: |
| 105 | decrypted_raw = decrypt(encrypted_data) |
| 106 | # decrypt 可能返回非 JSON 字符串,防护解析异常 |
| 107 | decrypted_data = json.loads(decrypted_raw) if decrypted_raw else {} |
| 108 | if isinstance(decrypted_data, dict): |
| 109 | instance.update(decrypted_data) |
| 110 | except Exception as e: |
| 111 | maxkb_logger.exception("Failed to decrypt/parse encryptedData for user %s: %s", username, e) |
| 112 | raise AppApiException(500, _("Invalid encrypted data")) |
| 113 | |
| 114 | try: |
| 115 | LoginRequest(data=instance).is_valid(raise_exception=True) |
| 116 | except serializers.ValidationError: |
| 117 | raise |
| 118 | except Exception as e: |
| 119 | raise AppApiException(500, str(e)) |
| 120 | |
| 121 | password = instance.get("password") |
| 122 | captcha = instance.get("captcha", "") |
| 123 | |
| 124 | # 获取认证配置 |
| 125 | auth_setting = LoginSerializer.get_auth_setting() |
| 126 | max_attempts = auth_setting.get("max_attempts", 1) |
| 127 | failed_attempts = auth_setting.get("failed_attempts", 5) |
| 128 | lock_time = auth_setting.get("lock_time", 10) |
| 129 | |
| 130 | # 检查许可证有效性 |
| 131 | license_validator = DatabaseModelManage.get_model("license_is_valid") or (lambda: False) |
| 132 | is_license_valid = license_validator() if license_validator() is not None else False |
| 133 | |
| 134 | if is_license_valid: |
| 135 | # 检查账户是否被锁定 |
| 136 | if LoginSerializer._is_account_locked(username, failed_attempts): |
| 137 | raise AppApiException( |
| 138 | 1005, _("This account has been locked for %s minutes, please try again later") % lock_time |
| 139 | ) |
| 140 | |
| 141 | # 验证验证码 |
| 142 | if LoginSerializer._need_captcha(username, max_attempts): |
| 143 | LoginSerializer._validate_captcha(username, captcha) |
| 144 | |
| 145 | # 验证用户凭据:先按用户名查找,再用 password_verify 验证密码 |
| 146 | user = User.objects.filter(username=username).first() |
| 147 | |
| 148 | if not user or not password_verify(password, user.password): |
| 149 | LoginSerializer._handle_failed_login(username, is_license_valid, failed_attempts, lock_time) |
| 150 | raise AppApiException(500, _("The username or password is incorrect")) |
| 151 | |
| 152 | # Transparently upgrade legacy MD5 hash to PBKDF2 |
| 153 | if needs_password_upgrade(user.password): |
| 154 | user.password = password_encrypt(password) |
| 155 | user.save(update_fields=["password"]) |
no test coverage detected