(self, param: str, trans: I18n)
| 180 | return False, e |
| 181 | |
| 182 | async def validateEmbedded(self, param: str, trans: I18n) -> tuple[any]: |
| 183 | try: |
| 184 | # WARNING: Signature verification is disabled for embedded tokens |
| 185 | # This is a security risk and should only be used if absolutely necessary |
| 186 | # Consider implementing proper signature verification with a shared secret |
| 187 | payload: dict = jwt.decode( |
| 188 | param, |
| 189 | options={"verify_signature": False, "verify_exp": False}, |
| 190 | algorithms=[security.ALGORITHM] |
| 191 | ) |
| 192 | app_key = payload.get('appId', '') |
| 193 | embeddedId = payload.get('embeddedId', None) |
| 194 | if not embeddedId: |
| 195 | embeddedId = xor_decrypt(app_key) |
| 196 | if not payload['account']: |
| 197 | return False, f"Miss account payload error!" |
| 198 | account = payload['account'] |
| 199 | with Session(engine) as session: |
| 200 | assistant_info = await get_assistant_info(session=session, assistant_id=embeddedId) |
| 201 | assistant_info = AssistantModel.model_validate(assistant_info) |
| 202 | payload = jwt.decode( |
| 203 | param, assistant_info.app_secret, algorithms=[security.ALGORITHM] |
| 204 | ) |
| 205 | assistant_info = AssistantHeader.model_validate(assistant_info.model_dump(exclude_unset=True)) |
| 206 | """ session_user = await get_user_info(session = session, user_id = token_data.id) |
| 207 | session_user = UserInfoDTO.model_validate(session_user) """ |
| 208 | session_user = get_user_by_account(session = session, account=account) |
| 209 | if not session_user: |
| 210 | message = trans('i18n_not_exist', msg = trans('i18n_user.account')) |
| 211 | raise Exception(message) |
| 212 | session_user = await get_user_info(session = session, user_id = session_user.id) |
| 213 | |
| 214 | session_user = UserInfoDTO.model_validate(session_user) |
| 215 | if session_user.status != 1: |
| 216 | message = trans('i18n_login.user_disable', msg = trans('i18n_concat_admin')) |
| 217 | raise Exception(message) |
| 218 | if not session_user.oid or session_user.oid == 0: |
| 219 | message = trans('i18n_login.no_associated_ws', msg = trans('i18n_concat_admin')) |
| 220 | raise Exception(message) |
| 221 | if session_user.oid: |
| 222 | assistant_info.oid = int(session_user.oid) |
| 223 | return True, session_user, assistant_info |
| 224 | except Exception as e: |
| 225 | SQLBotLogUtil.exception(f"Embedded validation error: {str(e)}") |
| 226 | # Return False and the exception message |
| 227 | return False, e |
| 228 | |
| 229 | def xor_decrypt(encrypted_str: str, key: int = 0xABCD1234) -> int: |
| 230 | encrypted_bytes = base64.urlsafe_b64decode(encrypted_str) |
no test coverage detected