| 82 | t.join() |
| 83 | |
| 84 | def run(self, loop=True): |
| 85 | logger.info('Starting {} {} workers'.format(self.num_workers, self.task_type)) |
| 86 | |
| 87 | if self.num_workers > 1: |
| 88 | for i in range(self.num_workers): |
| 89 | t = threading.Thread(target=self.__worker) |
| 90 | t.start() |
| 91 | self.threads.append(t) |
| 92 | |
| 93 | try: |
| 94 | while True: |
| 95 | requeue_stuck_tasks(self.task_type) |
| 96 | if self.task_type == 'classify.color': |
| 97 | task_queryset = Task.objects.filter(library__classification_color_enabled=True, type=self.task_type, status='P') |
| 98 | elif self.task_type == 'classify.location': |
| 99 | task_queryset = Task.objects.filter(library__classification_location_enabled=True, type=self.task_type, status='P') |
| 100 | elif self.task_type == 'classify.face': |
| 101 | task_queryset = Task.objects.filter(library__classification_face_enabled=True, type=self.task_type, status='P') |
| 102 | elif self.task_type == 'classify.style': |
| 103 | task_queryset = Task.objects.filter(library__classification_style_enabled=True, type=self.task_type, status='P') |
| 104 | elif self.task_type == 'classify.object': |
| 105 | task_queryset = Task.objects.filter(library__classification_object_enabled=True, type=self.task_type, status='P') |
| 106 | else: |
| 107 | task_queryset = Task.objects.filter(type=self.task_type, status='P') |
| 108 | for task in task_queryset[:8]: |
| 109 | if self.num_workers > 1: |
| 110 | logger.debug('putting task') |
| 111 | self.queue.put(task) |
| 112 | else: |
| 113 | self.__process_task(task) |
| 114 | |
| 115 | if self.num_workers > 1: |
| 116 | self.queue.join() |
| 117 | |
| 118 | if not loop: |
| 119 | self.__clean_up() |
| 120 | return |
| 121 | sleep(1) |
| 122 | |
| 123 | except KeyboardInterrupt: |
| 124 | self.__clean_up() |