revoke job. this function need to support reentry note: careful of race condition
(cls, job, result_code, result_msg)
| 231 | |
| 232 | @classmethod |
| 233 | def revoke_job(cls, job, result_code, result_msg): |
| 234 | """revoke job. this function need to support reentry |
| 235 | |
| 236 | note: careful of race condition |
| 237 | """ |
| 238 | job_id = job.id |
| 239 | nrows = models.Job.objects.filter( |
| 240 | id=job_id, state__in=[ |
| 241 | models.Job.StateEnum.WAITING, models.Job.StateEnum.RUNNING, |
| 242 | models.Job.StateEnum.INITING, models.Job.StateEnum.INITED] |
| 243 | ).update(state=models.Job.StateEnum.CLOSING) |
| 244 | if nrows == 0: |
| 245 | return |
| 246 | logger.info("[Job: %s] revoke job, result: [%s]%s" % (job.id, result_code, result_msg)) |
| 247 | job = models.Job.objects.get(id=job_id) |
| 248 | revoke_time = now() |
| 249 | for task in job.task_set.exclude(state=models.Task.StateEnum.CLOSED): |
| 250 | # 判断当前结果码是否客户端异常码,如果是则调整为任务取消码(299) |
| 251 | task_result_code = result_code |
| 252 | if errcode.is_node_error(task_result_code): |
| 253 | task_result_code = errcode.E_NODE_TASK_CANCEL |
| 254 | |
| 255 | nrow = models.Task.objects \ |
| 256 | .filter(id=task.id, state=models.Task.StateEnum.RUNNING) \ |
| 257 | .update(state=models.Task.StateEnum.CLOSED, |
| 258 | result_code=task_result_code, |
| 259 | result_msg='Job revoked: %s' % result_msg, |
| 260 | end_time=revoke_time) |
| 261 | if nrow == 1: # race condition |
| 262 | # put this task to killingtask table |
| 263 | if task.node: |
| 264 | logger.info("[Task: %s][Node: %s] revoke job, update node to free state" % (task.id, task.node)) |
| 265 | node_id = task.node.id # codepuppy 未上线_kill_task可能会导致继续给节点派发任务 |
| 266 | Node.objects.filter(id=node_id).update(state=Node.StateEnum.FREE) |
| 267 | models.KillingTask.objects.create(node=task.node, task=task) |
| 268 | # 取消关联的task_process |
| 269 | cls.revoke_task_process_relation(task.id, task_result_code, result_msg, revoke_time) |
| 270 | continue |
| 271 | models.Task.objects.filter(id=task.id).update( |
| 272 | state=models.Task.StateEnum.CLOSED, |
| 273 | result_code=task_result_code, |
| 274 | result_msg='Job revoked: %s' % result_msg, |
| 275 | end_time=revoke_time) |
| 276 | # 取消关联的task_process |
| 277 | cls.revoke_task_process_relation(task.id, task_result_code, result_msg, revoke_time) |
| 278 | |
| 279 | job.closing_time = revoke_time |
| 280 | job.result_code = result_code |
| 281 | job.result_msg = result_msg |
| 282 | job.state = models.Job.StateEnum.CLOSED |
| 283 | job.expire_time = revoke_time |
| 284 | job.save() |
| 285 | |
| 286 | scan_id = job.scan_id |
| 287 | if scan_id: |
| 288 | logger.info( |
| 289 | "[Job: %s] request analyse server to close scan[%d] ..." % (job.id, scan_id)) |
| 290 | try: |
no test coverage detected