()
| 45 | var _inProgressJobSpecMap = map[string]*spec.TaskJob{} |
| 46 | |
| 47 | func ManageJobResources() error { |
| 48 | inProgressJobKeys, err := job.ListAllInProgressJobKeys(userconfig.TaskAPIKind) |
| 49 | if err != nil { |
| 50 | return err |
| 51 | } |
| 52 | |
| 53 | inProgressJobIDSet := strset.Set{} |
| 54 | for _, jobKey := range inProgressJobKeys { |
| 55 | inProgressJobIDSet.Add(jobKey.ID) |
| 56 | } |
| 57 | |
| 58 | for jobID := range _inProgressJobSpecMap { |
| 59 | if !inProgressJobIDSet.Has(jobID) { |
| 60 | delete(_inProgressJobSpecMap, jobID) |
| 61 | } |
| 62 | } |
| 63 | |
| 64 | jobs, err := config.K8s.ListJobs( |
| 65 | &kmeta.ListOptions{ |
| 66 | LabelSelector: klabels.SelectorFromSet( |
| 67 | map[string]string{"apiKind": userconfig.TaskAPIKind.String()}, |
| 68 | ).String(), |
| 69 | }, |
| 70 | ) |
| 71 | if err != nil { |
| 72 | return err |
| 73 | } |
| 74 | |
| 75 | k8sJobMap := map[string]kbatch.Job{} |
| 76 | k8sJobIDSet := strset.Set{} |
| 77 | for _, kJob := range jobs { |
| 78 | k8sJobMap[kJob.Labels["jobID"]] = kJob |
| 79 | k8sJobIDSet.Add(kJob.Labels["jobID"]) |
| 80 | } |
| 81 | |
| 82 | for _, jobKey := range inProgressJobKeys { |
| 83 | jobLogger, err := operator.GetJobLogger(jobKey) |
| 84 | if err != nil { |
| 85 | telemetry.Error(err) |
| 86 | operatorLogger.Error(err) |
| 87 | continue |
| 88 | } |
| 89 | |
| 90 | k8sJob, jobFound := k8sJobMap[jobKey.ID] |
| 91 | |
| 92 | jobState, err := job.GetJobState(jobKey) |
| 93 | if err != nil { |
| 94 | jobLogger.Error(err) |
| 95 | jobLogger.Error("terminating job and cleaning up job resources") |
| 96 | err := errors.FirstError( |
| 97 | job.DeleteInProgressFile(jobKey), |
| 98 | deleteJobRuntimeResources(jobKey), |
| 99 | recordFailure(jobKey), |
| 100 | ) |
| 101 | if err != nil { |
| 102 | telemetry.Error(err) |
| 103 | operatorLogger.Error(err) |
| 104 | } |
nothing calls this directly
no test coverage detected