MCPcopy
hub / github.com/SpiderClub/haipproxy / schedule_task_with_lock

Method schedule_task_with_lock

scheduler/scheduler.py:125–160  ·  view source on GitHub ↗

Validator scheduler filters tasks according to task name since it's task name stands for task type

(self, task)

Source from the content-addressed store, hash-verified

123
124class ValidatorScheduler(BaseScheduler):
125 def schedule_task_with_lock(self, task):
126 """Validator scheduler filters tasks according to task name
127 since it's task name stands for task type"""
128 if not task.get('enable'):
129 return None
130 task_queue = task.get('task_queue')
131 if task_queue not in self.task_queues:
132 return None
133
134 conn = get_redis_conn()
135 internal = task.get('internal')
136 task_name = task.get('name')
137 resource_queue = task.get('resource')
138 lock_indentifier = acquire_lock(conn, task_name)
139 if not lock_indentifier:
140 return False
141 pipe = conn.pipeline(True)
142 try:
143 now = int(time.time())
144 pipe.hget(TIMER_RECORDER, task_name)
145 pipe.zrevrangebyscore(resource_queue, '+inf', '-inf')
146 r, proxies = pipe.execute()
147 if not r or (now - int(r.decode('utf-8'))) >= internal * 60:
148 if not proxies:
149 scheduler_logger.warning('fetched no proxies from task {}'.format(task_name))
150 return None
151
152 pipe.sadd(task_queue, *proxies)
153 pipe.hset(TIMER_RECORDER, task_name, now)
154 pipe.execute()
155 scheduler_logger.info('validator task {} has been stored into redis successfully'.format(task_name))
156 return True
157 else:
158 return None
159 finally:
160 release_lock(conn, task_name, lock_indentifier)
161
162
163@click.command()

Callers

nothing calls this directly

Calls 4

get_redis_connFunction · 0.90
acquire_lockFunction · 0.90
release_lockFunction · 0.90
getMethod · 0.80

Tested by

no test coverage detected