MCPcopy
hub / github.com/Tencent/CodeAnalysis / close_job

Method close_job

server/projects/main/apps/job/core/base.py:358–428  ·  view source on GitHub ↗

close job normally. this function needs to suport reentry.

(cls, job_id, reclose=False)

Source from the content-addressed store, hash-verified

356
357 @classmethod
358 def close_job(cls, job_id, reclose=False):
359 """close job normally. this function needs to suport reentry.
360 """
361 logger.info("[Job: %s] start close_job ..." % job_id)
362 job = models.Job.objects.get(id=job_id)
363 scan_id = job.scan_id # 部分情况下job_context中没有scanid
364 logger.info("[Job: %s] get scan_id is %s" % (job_id, scan_id))
365 logger.info("[Job: %s] checking task_done number (%d/%d)..." % (job_id, job.task_done, job.task_num))
366 if job.task_num != job.task_done: # not to closed yet
367 logger.info("[Job: %s] check failed. task_done(%d) less than task_num(%d). end of close_job." % (
368 job_id, job.task_done, job.task_num))
369 return
370 logger.info("[Job: %d] check pass." % job_id)
371
372 # check if there is older job unclosed
373 logger.info("[Job: %d] checking older job unclosed..." % job_id)
374 try:
375 older_jobs = models.Job.objects.filter(
376 project=job.project, id__lt=job.id
377 ).exclude(
378 state=models.Job.StateEnum.CLOSED
379 )
380 if older_jobs:
381 logger.info("[Job: %d] canceling %d older scan jobs..." % (job_id, older_jobs.count()))
382 try:
383 for j in older_jobs:
384 if not j.check_redirect():
385 continue
386 logger.info("[Job: %d] canceling older job[%d]..." % (job_id, j.id))
387 result_msg = json.dumps(
388 {"job_id": job.id, "scan_id": scan_id, "msg": "plz check the other job's result"})
389 JobCloseHandler.revoke_job(j, errcode.CLIENT_REDIRECT, result_msg)
390 except Exception as e: # all exception
391 logger.error("[Job: %d] cancel older scan failed. end of close_job." % job.id)
392 logger.exception(e)
393 models.Job.objects.filter(id=job_id, state=models.Job.StateEnum.RUNNING) \
394 .update(result_msg="入库失败:有历史任务未完成入库")
395 return
396 logger.info("[Job: %s] check pass." % job_id)
397 except ObjectDoesNotExist:
398 logger.info(
399 "[Job: %s] check pass. scan.ObjectDoesNotExist no need to check." % job_id)
400 pass
401
402 if reclose: # 支持重新入库
403 models.Job.objects.filter(id=job_id, state=models.Job.StateEnum.CLOSED).update(
404 state=models.Job.StateEnum.RUNNING)
405
406 logger.info("[Job: %d] checking if other is closing job ..." % job_id)
407 nrows = models.Job.objects.filter(
408 id=job_id, state__in=[models.Job.StateEnum.WAITING, models.Job.StateEnum.RUNNING]) \
409 .update(state=models.Job.StateEnum.CLOSING, closing_time=timezone.now())
410 if nrows == 0: # other is closing the job
411 logger.info("[Job: %d] unable to close due to it's %s" %
412 (job_id, job.get_state_display()))
413 return
414 logger.info("[Job: %d] check pass." % job_id)
415 job.result_code = errcode.OK

Callers 3

putMethod · 0.80
monitor_running_jobFunction · 0.80
create_tasksMethod · 0.80

Calls 9

check_redirectMethod · 0.80
revoke_jobMethod · 0.80
errorMethod · 0.80
exceptionMethod · 0.80
close_scanMethod · 0.80
infoMethod · 0.45
getMethod · 0.45
updateMethod · 0.45

Tested by

no test coverage detected