MCPcopy Index your code
hub / github.com/1Panel-dev/MaxKB / OpenChatSerializers

Class OpenChatSerializers

apps/chat/serializers/chat.py:531–600  ·  view source on GitHub ↗

Source from the content-addressed store, hash-verified

529
530
531class OpenChatSerializers(serializers.Serializer):
532 workspace_id = serializers.CharField(required=False, allow_null=True, allow_blank=True, label=_("Workspace ID"))
533 application_id = serializers.UUIDField(required=True)
534 chat_user_id = serializers.CharField(required=True, label=_("Client id"))
535 chat_user_type = serializers.CharField(required=True, label=_("Client Type"))
536 debug = serializers.BooleanField(required=True, label=_("Debug"))
537 ip_address = serializers.CharField(required=False, label=_("IP Address"))
538 source = serializers.JSONField(required=False, label=_("Source"))
539
540 def is_valid(self, *, raise_exception=False):
541 super().is_valid(raise_exception=True)
542 workspace_id = self.data.get('workspace_id')
543 application_id = self.data.get('application_id')
544 query_set = QuerySet(Application).filter(id=application_id)
545 if workspace_id:
546 query_set = query_set.filter(workspace_id=workspace_id)
547 if not query_set.exists():
548 raise AppApiException(500, gettext('Application does not exist'))
549
550 def open(self):
551 self.is_valid(raise_exception=True)
552 application_id = self.data.get('application_id')
553 application = QuerySet(Application).get(id=application_id)
554 debug = self.data.get("debug")
555 if not debug:
556 application_version = QuerySet(ApplicationVersion).filter(application_id=application_id).order_by(
557 '-create_time')[0:1].first()
558 if application_version is None:
559 raise AppApiException(500,
560 _("The application has not been published. Please use it after publishing."))
561 if application.type == ApplicationTypeChoices.SIMPLE:
562 return self.open_simple(application)
563 else:
564 return self.open_work_flow(application)
565
566 def open_work_flow(self, application):
567 self.is_valid(raise_exception=True)
568 application_id = self.data.get('application_id')
569 chat_user_id = self.data.get("chat_user_id")
570 chat_user_type = self.data.get("chat_user_type")
571 ip_address = self.data.get("ip_address")
572 source = self.data.get("source")
573 debug = self.data.get("debug")
574 chat_id = str(uuid.uuid7())
575 ChatInfo(chat_id, chat_user_id, chat_user_type, ip_address, source, [],
576 [],
577 application_id, debug).set_cache()
578 return chat_id
579
580 def open_simple(self, application):
581 application_id = self.data.get('application_id')
582 chat_user_id = self.data.get("chat_user_id")
583 chat_user_type = self.data.get("chat_user_type")
584 ip_address = self.data.get("ip_address")
585 source = self.data.get("source")
586 debug = self.data.get("debug")
587 knowledge_id_list = [str(row.target_id) for row in
588 QuerySet(ResourceMapping).filter(source_id=str(application_id),

Callers 3

getMethod · 0.90
_get_chat_idMethod · 0.90
getMethod · 0.90

Calls

no outgoing calls

Tested by

no test coverage detected