()
| 20 | scheduleWorkerId: string |
| 21 | |
| 22 | async run(): Promise<void> { |
| 23 | logger.info('Starting Flowise Worker...') |
| 24 | |
| 25 | const { appDataSource, telemetry, componentNodes, cachePool, abortControllerPool, usageCacheManager, identityManager } = |
| 26 | await this.prepareData() |
| 27 | |
| 28 | const queueManager = QueueManager.getInstance() |
| 29 | queueManager.setupAllQueues({ |
| 30 | componentNodes, |
| 31 | telemetry, |
| 32 | cachePool, |
| 33 | appDataSource, |
| 34 | abortControllerPool, |
| 35 | usageCacheManager, |
| 36 | identityManager |
| 37 | }) |
| 38 | |
| 39 | /** Prediction */ |
| 40 | const predictionQueue = queueManager.getQueue('prediction') |
| 41 | const predictionWorker = predictionQueue.createWorker() |
| 42 | this.predictionWorkerId = predictionWorker.id |
| 43 | logger.info(`Prediction Worker ${this.predictionWorkerId} created`) |
| 44 | |
| 45 | const predictionQueueName = predictionQueue.getQueueName() |
| 46 | const queueEvents = new QueueEvents(predictionQueueName, { connection: queueManager.getConnection() }) |
| 47 | |
| 48 | queueEvents.on<CustomListener>('abort', async ({ id }: { id: string }) => { |
| 49 | abortControllerPool.abort(id) |
| 50 | }) |
| 51 | |
| 52 | /** Upsertion */ |
| 53 | const upsertionQueue = queueManager.getQueue('upsert') |
| 54 | const upsertionWorker = upsertionQueue.createWorker() |
| 55 | this.upsertionWorkerId = upsertionWorker.id |
| 56 | logger.info(`Upsertion Worker ${this.upsertionWorkerId} created`) |
| 57 | |
| 58 | /** Schedule */ |
| 59 | const scheduleQueue = queueManager.getQueue('schedule') |
| 60 | const scheduleWorker = scheduleQueue.createWorker() |
| 61 | this.scheduleWorkerId = scheduleWorker.id |
| 62 | logger.info(`Schedule Worker ${this.scheduleWorkerId} created`) |
| 63 | |
| 64 | // Keep the process running |
| 65 | process.stdin.resume() |
| 66 | } |
| 67 | |
| 68 | async prepareData() { |
| 69 | // Init database |
nothing calls this directly
no test coverage detected