(self, instance: Dict, with_valid=True)
| 456 | |
| 457 | @transaction.atomic |
| 458 | def batch_delete(self, instance: Dict, with_valid=True): |
| 459 | from trigger.handler.simple_tools import deploy, undeploy |
| 460 | |
| 461 | if with_valid: |
| 462 | BatchSerializer(data=instance).is_valid(model=Trigger, raise_exception=True) |
| 463 | self.is_valid(raise_exception=True) |
| 464 | workspace_id = self.data.get("workspace_id") |
| 465 | trigger_id_list = instance.get("id_list") |
| 466 | for trigger_id in trigger_id_list: |
| 467 | trigger = QuerySet(Trigger).filter(id=trigger_id).first() |
| 468 | undeploy(TriggerModelSerializer(trigger).data, **{}) |
| 469 | |
| 470 | TaskRecord.objects.filter(trigger_id__in=trigger_id_list).delete() |
| 471 | TriggerTask.objects.filter(trigger_id__in=trigger_id_list).delete() |
| 472 | Trigger.objects.filter(workspace_id=workspace_id, id__in=trigger_id_list).delete() |
| 473 | |
| 474 | return True |
| 475 | |
| 476 | @transaction.atomic |
| 477 | def batch_switch(self, instance: Dict, with_valid=True): |
no test coverage detected