(
type: JobType,
payload: TPayload,
options?: EnqueueOptions
)
| 81 | |
| 82 | export class DatabaseJobQueue implements JobQueueBackend { |
| 83 | async enqueue<TPayload>( |
| 84 | type: JobType, |
| 85 | payload: TPayload, |
| 86 | options?: EnqueueOptions |
| 87 | ): Promise<string> { |
| 88 | const jobId = options?.jobId ?? `run_${generateShortId(20)}` |
| 89 | const now = new Date() |
| 90 | |
| 91 | await db |
| 92 | .insert(asyncJobs) |
| 93 | .values({ |
| 94 | id: jobId, |
| 95 | type, |
| 96 | payload: payload as Record<string, unknown>, |
| 97 | status: JOB_STATUS.PENDING, |
| 98 | createdAt: now, |
| 99 | runAt: |
| 100 | options?.delayMs && options.delayMs > 0 ? new Date(now.getTime() + options.delayMs) : now, |
| 101 | attempts: 0, |
| 102 | maxAttempts: options?.maxAttempts ?? 3, |
| 103 | metadata: (options?.metadata ?? {}) as Record<string, unknown>, |
| 104 | updatedAt: now, |
| 105 | }) |
| 106 | .onConflictDoNothing() |
| 107 | |
| 108 | logger.debug('Enqueued job', { jobId, type }) |
| 109 | if (options?.runner) { |
| 110 | this.runInline( |
| 111 | type, |
| 112 | jobId, |
| 113 | payload, |
| 114 | options.runner, |
| 115 | options.concurrencyKey, |
| 116 | options.concurrencyLimit |
| 117 | ) |
| 118 | } |
| 119 | return jobId |
| 120 | } |
| 121 | |
| 122 | async batchEnqueue<TPayload>( |
| 123 | type: JobType, |
nothing calls this directly
no test coverage detected