(trigger, request=None, **kwargs)
| 114 | |
| 115 | @staticmethod |
| 116 | def execute(trigger, request=None, **kwargs): |
| 117 | trigger_setting = trigger.get('trigger_setting') |
| 118 | token = trigger_setting.get('token') |
| 119 | if not token: |
| 120 | raise AppAuthenticationFailed(1002, _('Authentication information is incorrect')) |
| 121 | request_token = request.META.get('HTTP_AUTHORIZATION') |
| 122 | if not request_token or token != request_token.replace('Bearer ', ''): |
| 123 | raise AppAuthenticationFailed(1002, _('Authentication information is incorrect')) |
| 124 | is_active = trigger.get('is_active') |
| 125 | if not is_active: |
| 126 | return Result(code=404, message="404", response_status=404) |
| 127 | body = trigger_setting.get('body') |
| 128 | parameters = get_parameters(body, request) |
| 129 | trigger_task_list = [TriggerTaskResponse(trigger_task).data for trigger_task in |
| 130 | QuerySet(TriggerTask).filter(trigger__id=trigger.get('id'), is_active=True)] |
| 131 | from trigger.handler.simple_tools import execute |
| 132 | for trigger_task in trigger_task_list: |
| 133 | execute(trigger_task, body=parameters) |
| 134 | return result.success(True) |
| 135 | |
| 136 | def support(self, trigger, **kwargs): |
| 137 | return trigger.get('trigger_type') == 'EVENT' |
no test coverage detected