close job normally. this function needs to suport reentry.
(cls, job_id, reclose=False)
| 356 | |
| 357 | @classmethod |
| 358 | def close_job(cls, job_id, reclose=False): |
| 359 | """close job normally. this function needs to suport reentry. |
| 360 | """ |
| 361 | logger.info("[Job: %s] start close_job ..." % job_id) |
| 362 | job = models.Job.objects.get(id=job_id) |
| 363 | scan_id = job.scan_id # 部分情况下job_context中没有scanid |
| 364 | logger.info("[Job: %s] get scan_id is %s" % (job_id, scan_id)) |
| 365 | logger.info("[Job: %s] checking task_done number (%d/%d)..." % (job_id, job.task_done, job.task_num)) |
| 366 | if job.task_num != job.task_done: # not to closed yet |
| 367 | logger.info("[Job: %s] check failed. task_done(%d) less than task_num(%d). end of close_job." % ( |
| 368 | job_id, job.task_done, job.task_num)) |
| 369 | return |
| 370 | logger.info("[Job: %d] check pass." % job_id) |
| 371 | |
| 372 | # check if there is older job unclosed |
| 373 | logger.info("[Job: %d] checking older job unclosed..." % job_id) |
| 374 | try: |
| 375 | older_jobs = models.Job.objects.filter( |
| 376 | project=job.project, id__lt=job.id |
| 377 | ).exclude( |
| 378 | state=models.Job.StateEnum.CLOSED |
| 379 | ) |
| 380 | if older_jobs: |
| 381 | logger.info("[Job: %d] canceling %d older scan jobs..." % (job_id, older_jobs.count())) |
| 382 | try: |
| 383 | for j in older_jobs: |
| 384 | if not j.check_redirect(): |
| 385 | continue |
| 386 | logger.info("[Job: %d] canceling older job[%d]..." % (job_id, j.id)) |
| 387 | result_msg = json.dumps( |
| 388 | {"job_id": job.id, "scan_id": scan_id, "msg": "plz check the other job's result"}) |
| 389 | JobCloseHandler.revoke_job(j, errcode.CLIENT_REDIRECT, result_msg) |
| 390 | except Exception as e: # all exception |
| 391 | logger.error("[Job: %d] cancel older scan failed. end of close_job." % job.id) |
| 392 | logger.exception(e) |
| 393 | models.Job.objects.filter(id=job_id, state=models.Job.StateEnum.RUNNING) \ |
| 394 | .update(result_msg="入库失败:有历史任务未完成入库") |
| 395 | return |
| 396 | logger.info("[Job: %s] check pass." % job_id) |
| 397 | except ObjectDoesNotExist: |
| 398 | logger.info( |
| 399 | "[Job: %s] check pass. scan.ObjectDoesNotExist no need to check." % job_id) |
| 400 | pass |
| 401 | |
| 402 | if reclose: # 支持重新入库 |
| 403 | models.Job.objects.filter(id=job_id, state=models.Job.StateEnum.CLOSED).update( |
| 404 | state=models.Job.StateEnum.RUNNING) |
| 405 | |
| 406 | logger.info("[Job: %d] checking if other is closing job ..." % job_id) |
| 407 | nrows = models.Job.objects.filter( |
| 408 | id=job_id, state__in=[models.Job.StateEnum.WAITING, models.Job.StateEnum.RUNNING]) \ |
| 409 | .update(state=models.Job.StateEnum.CLOSING, closing_time=timezone.now()) |
| 410 | if nrows == 0: # other is closing the job |
| 411 | logger.info("[Job: %d] unable to close due to it's %s" % |
| 412 | (job_id, job.get_state_display())) |
| 413 | return |
| 414 | logger.info("[Job: %d] check pass." % job_id) |
| 415 | job.result_code = errcode.OK |
no test coverage detected