MCPcopy Index your code
hub / github.com/FlowiseAI/Flowise / processJob

Method processJob

packages/server/src/queue/ScheduleQueue.ts:65–94  ·  view source on GitHub ↗
(data: IScheduleAgentflowJobData)

Source from the content-addressed store, hash-verified

63 }
64
65 async processJob(data: IScheduleAgentflowJobData): Promise<any> {
66 if (this.appDataSource) data.appDataSource = this.appDataSource
67 if (this.telemetry) data.telemetry = this.telemetry
68 if (this.cachePool) data.cachePool = this.cachePool
69 if (this.usageCacheManager) data.usageCacheManager = this.usageCacheManager
70 if (this.componentNodes) data.componentNodes = this.componentNodes
71
72 const { scheduleRecordId } = data
73
74 const ctx = {
75 appDataSource: this.appDataSource,
76 componentNodes: this.componentNodes,
77 telemetry: this.telemetry,
78 cachePool: this.cachePool,
79 usageCacheManager: this.usageCacheManager,
80 sseStreamer: this.redisPublisher,
81 identityManager: this.identityManager
82 }
83
84 return executeScheduleJob(ctx, scheduleRecordId, {
85 onRecordNotFoundOrDisabled: async () => {
86 await this.removeJobScheduler(scheduleRecordId)
87 },
88 onRecordExpiredOrInvalid: async (record) => {
89 record.enabled = false
90 await this.appDataSource.getRepository(ScheduleRecord).save(record)
91 await this.removeJobScheduler(scheduleRecordId)
92 }
93 })
94 }
95
96 /**
97 * Add a repeatable scheduled job using BullMQ's repeat options.

Callers

nothing calls this directly

Calls 2

removeJobSchedulerMethod · 0.95
executeScheduleJobFunction · 0.90

Tested by

no test coverage detected