MCPcopy
hub / github.com/Tencent/CodeAnalysis / revoke_job

Method revoke_job

server/projects/main/apps/job/core/base.py:233–299  ·  view source on GitHub ↗

revoke job. this function need to support reentry note: careful of race condition

(cls, job, result_code, result_msg)

Source from the content-addressed store, hash-verified

231
232 @classmethod
233 def revoke_job(cls, job, result_code, result_msg):
234 """revoke job. this function need to support reentry
235
236 note: careful of race condition
237 """
238 job_id = job.id
239 nrows = models.Job.objects.filter(
240 id=job_id, state__in=[
241 models.Job.StateEnum.WAITING, models.Job.StateEnum.RUNNING,
242 models.Job.StateEnum.INITING, models.Job.StateEnum.INITED]
243 ).update(state=models.Job.StateEnum.CLOSING)
244 if nrows == 0:
245 return
246 logger.info("[Job: %s] revoke job, result: [%s]%s" % (job.id, result_code, result_msg))
247 job = models.Job.objects.get(id=job_id)
248 revoke_time = now()
249 for task in job.task_set.exclude(state=models.Task.StateEnum.CLOSED):
250 # 判断当前结果码是否客户端异常码,如果是则调整为任务取消码(299)
251 task_result_code = result_code
252 if errcode.is_node_error(task_result_code):
253 task_result_code = errcode.E_NODE_TASK_CANCEL
254
255 nrow = models.Task.objects \
256 .filter(id=task.id, state=models.Task.StateEnum.RUNNING) \
257 .update(state=models.Task.StateEnum.CLOSED,
258 result_code=task_result_code,
259 result_msg='Job revoked: %s' % result_msg,
260 end_time=revoke_time)
261 if nrow == 1: # race condition
262 # put this task to killingtask table
263 if task.node:
264 logger.info("[Task: %s][Node: %s] revoke job, update node to free state" % (task.id, task.node))
265 node_id = task.node.id # codepuppy 未上线_kill_task可能会导致继续给节点派发任务
266 Node.objects.filter(id=node_id).update(state=Node.StateEnum.FREE)
267 models.KillingTask.objects.create(node=task.node, task=task)
268 # 取消关联的task_process
269 cls.revoke_task_process_relation(task.id, task_result_code, result_msg, revoke_time)
270 continue
271 models.Task.objects.filter(id=task.id).update(
272 state=models.Task.StateEnum.CLOSED,
273 result_code=task_result_code,
274 result_msg='Job revoked: %s' % result_msg,
275 end_time=revoke_time)
276 # 取消关联的task_process
277 cls.revoke_task_process_relation(task.id, task_result_code, result_msg, revoke_time)
278
279 job.closing_time = revoke_time
280 job.result_code = result_code
281 job.result_msg = result_msg
282 job.state = models.Job.StateEnum.CLOSED
283 job.expire_time = revoke_time
284 job.save()
285
286 scan_id = job.scan_id
287 if scan_id:
288 logger.info(
289 "[Job: %s] request analyse server to close scan[%d] ..." % (job.id, scan_id))
290 try:

Callers 13

add_job_to_queueMethod · 0.80
close_jobMethod · 0.80
putMethod · 0.80
cancel_jobMethod · 0.80
handle_expired_jobFunction · 0.80
create_local_scanFunction · 0.80
create_server_scanFunction · 0.80
_upload_task_paramsMethod · 0.80
_upload_job_contextMethod · 0.80
start_jobMethod · 0.80

Calls 9

close_scanMethod · 0.80
exceptionMethod · 0.80
updateMethod · 0.45
infoMethod · 0.45
getMethod · 0.45
createMethod · 0.45
saveMethod · 0.45

Tested by

no test coverage detected