(job: Job<JobPayload>, signal: AbortSignal)
| 277 | } |
| 278 | |
| 279 | private async runJob(job: Job<JobPayload>, signal: AbortSignal) { |
| 280 | const id = job.data.jobId; |
| 281 | const logger = createJobLogger(id); |
| 282 | logger.debug(`Running ${job.data.type} job ${id} for repo ${job.data.repoName} (id: ${job.data.repoId})`); |
| 283 | |
| 284 | const currentStatus = await this.db.repoIndexingJob.findUniqueOrThrow({ |
| 285 | where: { |
| 286 | id, |
| 287 | }, |
| 288 | select: { |
| 289 | status: true, |
| 290 | } |
| 291 | }); |
| 292 | |
| 293 | // Fail safe: if the job is not PENDING (first run) or IN_PROGRESS (retry), it indicates the job |
| 294 | // is in an invalid state and should be skipped. |
| 295 | if ( |
| 296 | currentStatus.status !== RepoIndexingJobStatus.PENDING && |
| 297 | currentStatus.status !== RepoIndexingJobStatus.IN_PROGRESS |
| 298 | ) { |
| 299 | throw new Error(`Job ${id} is not in a valid state. Expected: ${RepoIndexingJobStatus.PENDING} or ${RepoIndexingJobStatus.IN_PROGRESS}. Actual: ${currentStatus.status}. Skipping.`); |
| 300 | } |
| 301 | |
| 302 | const { repo, type: jobType } = await this.db.repoIndexingJob.update({ |
| 303 | where: { |
| 304 | id, |
| 305 | }, |
| 306 | data: { |
| 307 | status: RepoIndexingJobStatus.IN_PROGRESS, |
| 308 | repo: { |
| 309 | update: { |
| 310 | latestIndexingJobStatus: RepoIndexingJobStatus.IN_PROGRESS, |
| 311 | } |
| 312 | } |
| 313 | }, |
| 314 | select: { |
| 315 | type: true, |
| 316 | repo: { |
| 317 | include: { |
| 318 | connections: { |
| 319 | include: { |
| 320 | connection: true, |
| 321 | } |
| 322 | } |
| 323 | } |
| 324 | } |
| 325 | } |
| 326 | }); |
| 327 | |
| 328 | const jobTypeLabel = getJobTypePrometheusLabel(jobType); |
| 329 | this.promClient.pendingRepoIndexJobs.dec({ repo: job.data.repoName, type: jobTypeLabel }); |
| 330 | this.promClient.activeRepoIndexJobs.inc({ repo: job.data.repoName, type: jobTypeLabel }); |
| 331 | |
| 332 | if (jobType === RepoIndexingJobType.INDEX) { |
| 333 | const revisions = await this.indexRepository(repo, logger, signal); |
| 334 | |
| 335 | await this.db.repoIndexingJob.update({ |
| 336 | where: { id }, |
no test coverage detected