(self, instance: Dict, with_valid=True)
| 475 | |
| 476 | @transaction.atomic |
| 477 | def batch_switch(self, instance: Dict, with_valid=True): |
| 478 | from trigger.handler.simple_tools import deploy, undeploy |
| 479 | |
| 480 | if with_valid: |
| 481 | BatchActiveSerializer(data=instance).is_valid(model=Trigger, raise_exception=True) |
| 482 | self.is_valid(raise_exception=True) |
| 483 | workspace_id = self.data.get("workspace_id") |
| 484 | trigger_id_list = instance.get("id_list") |
| 485 | is_active = instance.get("is_active") |
| 486 | Trigger.objects.filter(workspace_id=workspace_id, id__in=trigger_id_list, is_active=not is_active).update( |
| 487 | is_active=is_active) |
| 488 | if is_active: |
| 489 | for trigger_id in trigger_id_list: |
| 490 | trigger = QuerySet(Trigger).filter(id=trigger_id).first() |
| 491 | deploy(TriggerModelSerializer(trigger).data, **{}) |
| 492 | else: |
| 493 | for trigger_id in trigger_id_list: |
| 494 | trigger = QuerySet(Trigger).filter(id=trigger_id).first() |
| 495 | undeploy(TriggerModelSerializer(trigger).data, **{}) |
| 496 | |
| 497 | return True |
| 498 | |
| 499 | |
| 500 | class TriggerOperateSerializer(serializers.Serializer): |
no test coverage detected