MCPcopy Index your code
hub / github.com/dataease/SQLBot / validateEmbedded

Method validateEmbedded

backend/apps/system/middleware/auth.py:182–227  ·  view source on GitHub ↗
(self, param: str, trans: I18n)

Source from the content-addressed store, hash-verified

180 return False, e
181
182 async def validateEmbedded(self, param: str, trans: I18n) -> tuple[any]:
183 try:
184 # WARNING: Signature verification is disabled for embedded tokens
185 # This is a security risk and should only be used if absolutely necessary
186 # Consider implementing proper signature verification with a shared secret
187 payload: dict = jwt.decode(
188 param,
189 options={"verify_signature": False, "verify_exp": False},
190 algorithms=[security.ALGORITHM]
191 )
192 app_key = payload.get('appId', '')
193 embeddedId = payload.get('embeddedId', None)
194 if not embeddedId:
195 embeddedId = xor_decrypt(app_key)
196 if not payload['account']:
197 return False, f"Miss account payload error!"
198 account = payload['account']
199 with Session(engine) as session:
200 assistant_info = await get_assistant_info(session=session, assistant_id=embeddedId)
201 assistant_info = AssistantModel.model_validate(assistant_info)
202 payload = jwt.decode(
203 param, assistant_info.app_secret, algorithms=[security.ALGORITHM]
204 )
205 assistant_info = AssistantHeader.model_validate(assistant_info.model_dump(exclude_unset=True))
206 """ session_user = await get_user_info(session = session, user_id = token_data.id)
207 session_user = UserInfoDTO.model_validate(session_user) """
208 session_user = get_user_by_account(session = session, account=account)
209 if not session_user:
210 message = trans('i18n_not_exist', msg = trans('i18n_user.account'))
211 raise Exception(message)
212 session_user = await get_user_info(session = session, user_id = session_user.id)
213
214 session_user = UserInfoDTO.model_validate(session_user)
215 if session_user.status != 1:
216 message = trans('i18n_login.user_disable', msg = trans('i18n_concat_admin'))
217 raise Exception(message)
218 if not session_user.oid or session_user.oid == 0:
219 message = trans('i18n_login.no_associated_ws', msg = trans('i18n_concat_admin'))
220 raise Exception(message)
221 if session_user.oid:
222 assistant_info.oid = int(session_user.oid)
223 return True, session_user, assistant_info
224 except Exception as e:
225 SQLBotLogUtil.exception(f"Embedded validation error: {str(e)}")
226 # Return False and the exception message
227 return False, e
228
229def xor_decrypt(encrypted_str: str, key: int = 0xABCD1234) -> int:
230 encrypted_bytes = base64.urlsafe_b64decode(encrypted_str)

Callers 1

validateAssistantMethod · 0.95

Calls 6

get_assistant_infoFunction · 0.90
get_user_by_accountFunction · 0.90
get_user_infoFunction · 0.90
xor_decryptFunction · 0.85
exceptionMethod · 0.80
getMethod · 0.65

Tested by

no test coverage detected