| 63 | self.metrics = get_prometheus_metrics() |
| 64 | |
| 65 | def event(self, event): |
| 66 | # Save the event |
| 67 | super().event(event) |
| 68 | |
| 69 | worker_name = event['hostname'] |
| 70 | event_type = event['type'] |
| 71 | |
| 72 | self.counter[worker_name][event_type] += 1 |
| 73 | |
| 74 | if event_type.startswith('task-'): |
| 75 | task_id = event['uuid'] |
| 76 | task = self.tasks.get(task_id) |
| 77 | task_name = event.get('name', '') |
| 78 | if not task_name and task_id in self.tasks: |
| 79 | task_name = task.name or '' |
| 80 | self.metrics.events.labels(worker_name, event_type, task_name).inc() |
| 81 | |
| 82 | runtime = event.get('runtime', 0) |
| 83 | if runtime: |
| 84 | self.metrics.runtime.labels(worker_name, task_name).observe(runtime) |
| 85 | |
| 86 | task_started = task.started |
| 87 | task_received = task.received |
| 88 | |
| 89 | if event_type == 'task-received' and not task.eta and task_received: |
| 90 | self.metrics.number_of_prefetched_tasks.labels(worker_name, task_name).inc() |
| 91 | |
| 92 | if event_type == 'task-started' and not task.eta and task_started and task_received: |
| 93 | self.metrics.prefetch_time.labels(worker_name, task_name).set(task_started - task_received) |
| 94 | self.metrics.number_of_prefetched_tasks.labels(worker_name, task_name).dec() |
| 95 | |
| 96 | if event_type in ['task-succeeded', 'task-failed'] and not task.eta and task_started and task_received: |
| 97 | self.metrics.prefetch_time.labels(worker_name, task_name).set(0) |
| 98 | |
| 99 | if event_type == 'worker-online': |
| 100 | self.metrics.worker_online.labels(worker_name).set(1) |
| 101 | |
| 102 | if event_type == 'worker-heartbeat': |
| 103 | self.metrics.worker_online.labels(worker_name).set(1) |
| 104 | |
| 105 | num_executing_tasks = event.get('active') |
| 106 | if num_executing_tasks is not None: |
| 107 | self.metrics.worker_number_of_currently_executing_tasks.labels(worker_name).set(num_executing_tasks) |
| 108 | |
| 109 | if event_type == 'worker-offline': |
| 110 | self.metrics.worker_online.labels(worker_name).set(0) |
| 111 | |
| 112 | |
| 113 | class Events(threading.Thread): |