Watches handle_event and handle_batch tasks and cancels them if they exceed their max runtime.
(self)
| 950 | return await task |
| 951 | |
| 952 | async def _event_handler_watchdog(self): |
| 953 | """ |
| 954 | Watches handle_event and handle_batch tasks and cancels them if they exceed their max runtime. |
| 955 | """ |
| 956 | while not self.scan.stopping and not self.errored: |
| 957 | # if there are events in the outgoing queue, we leave the tasks alone |
| 958 | if self.outgoing_event_queue.qsize() > 0: |
| 959 | await self.helpers.sleep(self._event_handler_watchdog_interval) |
| 960 | continue |
| 961 | event_handler_tasks = [ |
| 962 | t for t in self._task_counter.tasks.values() if t.function_name in ("handle_event", "handle_batch") |
| 963 | ] |
| 964 | for task in event_handler_tasks: |
| 965 | if task.running_for > self.event_handler_timeout: |
| 966 | self.warning( |
| 967 | f"{self.name} Cancelling event handler task {task.task_name} because it's been running for {task.running_for:.1f}s (max timeout is {self.event_handler_timeout})" |
| 968 | ) |
| 969 | await task.cancel() |
| 970 | await asyncio.sleep(self._event_handler_watchdog_interval) |
| 971 | |
| 972 | async def queue_event(self, event): |
| 973 | """ |