Validator scheduler filters tasks according to task name since it's task name stands for task type
(self, task)
| 123 | |
| 124 | class ValidatorScheduler(BaseScheduler): |
| 125 | def schedule_task_with_lock(self, task): |
| 126 | """Validator scheduler filters tasks according to task name |
| 127 | since it's task name stands for task type""" |
| 128 | if not task.get('enable'): |
| 129 | return None |
| 130 | task_queue = task.get('task_queue') |
| 131 | if task_queue not in self.task_queues: |
| 132 | return None |
| 133 | |
| 134 | conn = get_redis_conn() |
| 135 | internal = task.get('internal') |
| 136 | task_name = task.get('name') |
| 137 | resource_queue = task.get('resource') |
| 138 | lock_indentifier = acquire_lock(conn, task_name) |
| 139 | if not lock_indentifier: |
| 140 | return False |
| 141 | pipe = conn.pipeline(True) |
| 142 | try: |
| 143 | now = int(time.time()) |
| 144 | pipe.hget(TIMER_RECORDER, task_name) |
| 145 | pipe.zrevrangebyscore(resource_queue, '+inf', '-inf') |
| 146 | r, proxies = pipe.execute() |
| 147 | if not r or (now - int(r.decode('utf-8'))) >= internal * 60: |
| 148 | if not proxies: |
| 149 | scheduler_logger.warning('fetched no proxies from task {}'.format(task_name)) |
| 150 | return None |
| 151 | |
| 152 | pipe.sadd(task_queue, *proxies) |
| 153 | pipe.hset(TIMER_RECORDER, task_name, now) |
| 154 | pipe.execute() |
| 155 | scheduler_logger.info('validator task {} has been stored into redis successfully'.format(task_name)) |
| 156 | return True |
| 157 | else: |
| 158 | return None |
| 159 | finally: |
| 160 | release_lock(conn, task_name, lock_indentifier) |
| 161 | |
| 162 | |
| 163 | @click.command() |
nothing calls this directly
no test coverage detected