(data: IScheduleAgentflowJobData)
| 63 | } |
| 64 | |
| 65 | async processJob(data: IScheduleAgentflowJobData): Promise<any> { |
| 66 | if (this.appDataSource) data.appDataSource = this.appDataSource |
| 67 | if (this.telemetry) data.telemetry = this.telemetry |
| 68 | if (this.cachePool) data.cachePool = this.cachePool |
| 69 | if (this.usageCacheManager) data.usageCacheManager = this.usageCacheManager |
| 70 | if (this.componentNodes) data.componentNodes = this.componentNodes |
| 71 | |
| 72 | const { scheduleRecordId } = data |
| 73 | |
| 74 | const ctx = { |
| 75 | appDataSource: this.appDataSource, |
| 76 | componentNodes: this.componentNodes, |
| 77 | telemetry: this.telemetry, |
| 78 | cachePool: this.cachePool, |
| 79 | usageCacheManager: this.usageCacheManager, |
| 80 | sseStreamer: this.redisPublisher, |
| 81 | identityManager: this.identityManager |
| 82 | } |
| 83 | |
| 84 | return executeScheduleJob(ctx, scheduleRecordId, { |
| 85 | onRecordNotFoundOrDisabled: async () => { |
| 86 | await this.removeJobScheduler(scheduleRecordId) |
| 87 | }, |
| 88 | onRecordExpiredOrInvalid: async (record) => { |
| 89 | record.enabled = false |
| 90 | await this.appDataSource.getRepository(ScheduleRecord).save(record) |
| 91 | await this.removeJobScheduler(scheduleRecordId) |
| 92 | } |
| 93 | }) |
| 94 | } |
| 95 | |
| 96 | /** |
| 97 | * Add a repeatable scheduled job using BullMQ's repeat options. |
nothing calls this directly
no test coverage detected