MCPcopy Index your code
hub / github.com/1Panel-dev/MaxKB / TriggerTaskRecordQuerySerializer

Class TriggerTaskRecordQuerySerializer

apps/trigger/serializers/trigger_task.py:125–176  ·  view source on GitHub ↗

Source from the content-addressed store, hash-verified

123
124
125class TriggerTaskRecordQuerySerializer(serializers.Serializer):
126 trigger_id = serializers.CharField(required=True, label=_("Trigger ID"))
127 workspace_id = serializers.CharField(required=False, allow_null=True, allow_blank=True, label=_("Workspace ID"))
128 state = serializers.CharField(required=False, allow_blank=True, allow_null=True, label=_('Trigger state'))
129 name = serializers.CharField(required=False, allow_blank=True, allow_null=True, label=_('Trigger name'))
130 source_type = serializers.CharField(required=False, allow_blank=True, allow_null=True, label=_('Source type'))
131 order = serializers.CharField(required=False, allow_null=True, allow_blank=True, label=_('Order field'))
132
133 def is_valid(self, *, raise_exception=False):
134 super().is_valid(raise_exception=True)
135 workspace_id = self.data.get('workspace_id')
136 query_set = QuerySet(Trigger).filter(id=self.data.get('trigger_id'))
137 if workspace_id:
138 query_set = query_set.filter(workspace_id=workspace_id)
139 if not query_set.exists():
140 raise AppApiException(500, _('Trigger id does not exist'))
141
142 def get_query_set(self):
143 trigger_query_set = QuerySet(
144 model=get_dynamics_model({
145 'ett.create_time': models.DateTimeField(),
146 'ett.state': models.CharField(),
147 'sdc.name': models.CharField(),
148 'ett.workspace_id': models.CharField(),
149 'ett.trigger_id': models.UUIDField(),
150 'sdc.source_type': models.CharField()
151 }))
152 trigger_query_set = trigger_query_set.filter(
153 **{'ett.trigger_id': self.data.get("trigger_id")})
154 if self.data.get("order"):
155 trigger_query_set = trigger_query_set.order_by(self.data.get("order"))
156 else:
157 trigger_query_set = trigger_query_set.order_by("-ett.create_time")
158 if self.data.get('state'):
159 trigger_query_set = trigger_query_set.filter(**{'ett.state': self.data.get('state')})
160 if self.data.get("name"):
161 trigger_query_set = trigger_query_set.filter(**{'sdc.name__contains': self.data.get('name')})
162 if self.data.get('source_type'):
163 trigger_query_set = trigger_query_set.filter(**{'sdc.source_type': self.data.get('source_type')})
164 return trigger_query_set
165
166 def list(self, with_valid=True):
167 if with_valid:
168 self.is_valid(raise_exception=True)
169 return [TriggerTaskResponse(row).data for row in self.get_query_set()]
170
171 def page(self, current_page, page_size, with_valid=True):
172 if with_valid:
173 self.is_valid(raise_exception=True)
174 return native_page_search(current_page, page_size, self.get_query_set(), get_file_content(
175 os.path.join(PROJECT_DIR, "apps", "trigger", "sql", 'get_trigger_task_record_page_list.sql')
176 ))

Callers 1

getMethod · 0.90

Calls

no outgoing calls

Tested by

no test coverage detected