| 31 | private disposed = false |
| 32 | |
| 33 | constructor( |
| 34 | redisUrl: string, |
| 35 | connectionDefaults: ReturnType<typeof getRedisConnectionDefaults>, |
| 36 | private config: PubSubChannelConfig |
| 37 | ) { |
| 38 | const commonOpts = { |
| 39 | ...connectionDefaults, |
| 40 | maxRetriesPerRequest: null, |
| 41 | retryStrategy: (times: number) => { |
| 42 | if (times > 10) return 30000 |
| 43 | return Math.min(times * 500, 5000) |
| 44 | }, |
| 45 | } satisfies RedisOptions |
| 46 | |
| 47 | this.pub = new Redis(redisUrl, { ...commonOpts, connectionName: `${config.label}-pub` }) |
| 48 | this.sub = new Redis(redisUrl, { ...commonOpts, connectionName: `${config.label}-sub` }) |
| 49 | |
| 50 | this.pub.on('error', (err) => |
| 51 | logger.error(`${config.label} publish client error:`, err.message) |
| 52 | ) |
| 53 | this.sub.on('error', (err) => |
| 54 | logger.error(`${config.label} subscribe client error:`, err.message) |
| 55 | ) |
| 56 | this.pub.on('connect', () => logger.info(`${config.label} publish client connected`)) |
| 57 | this.sub.on('connect', () => logger.info(`${config.label} subscribe client connected`)) |
| 58 | |
| 59 | this.sub.subscribe(config.channel, (err) => { |
| 60 | if (err) { |
| 61 | logger.error(`Failed to subscribe to ${config.label} channel:`, err) |
| 62 | } else { |
| 63 | logger.info(`Subscribed to ${config.label} channel`) |
| 64 | } |
| 65 | }) |
| 66 | |
| 67 | this.sub.on('message', (channel: string, message: string) => { |
| 68 | if (channel !== config.channel) return |
| 69 | try { |
| 70 | const parsed = JSON.parse(message) as T |
| 71 | for (const handler of this.handlers) { |
| 72 | try { |
| 73 | handler(parsed) |
| 74 | } catch (err) { |
| 75 | logger.error(`Error in ${config.label} handler:`, err) |
| 76 | } |
| 77 | } |
| 78 | } catch (err) { |
| 79 | logger.error(`Failed to parse ${config.label} message:`, err) |
| 80 | } |
| 81 | }) |
| 82 | } |
| 83 | |
| 84 | publish(event: T): void { |
| 85 | if (this.disposed) return |