MCPcopy
hub / github.com/triggerdotdev/trigger.dev / RedisZodSubscriber

Class RedisZodSubscriber

apps/webapp/app/v3/utils/zodPubSub.server.ts:23–97  ·  view source on GitHub ↗

Source from the content-addressed store, hash-verified

21}
22
23class RedisZodSubscriber<TMessageCatalog extends ZodMessageCatalogSchema>
24 implements ZodSubscriber<TMessageCatalog>
25{
26 private _subscriber: Redis;
27 private _listeners: Map<string, (payload: unknown) => Promise<void>> = new Map();
28 private _messageHandler: ZodMessageHandler<TMessageCatalog>;
29
30 public onUnsubscribed: Evt<{
31 pattern: string;
32 }> = new Evt();
33
34 constructor(
35 private readonly _pattern: string,
36 private readonly _options: ZodPubSubOptions<TMessageCatalog>,
37 private readonly _logger: Logger
38 ) {
39 this._subscriber = new Redis(_options.redis);
40 this._messageHandler = new ZodMessageHandler({
41 schema: _options.schema,
42 });
43 }
44
45 async initialize() {
46 await this._subscriber.psubscribe(this._pattern);
47 this._subscriber.on("pmessage", this.#onMessage.bind(this));
48 }
49
50 public on<K extends keyof TMessageCatalog>(
51 eventName: K,
52 listener: (payload: z.infer<TMessageCatalog[K]>) => Promise<void>
53 ): void {
54 this._listeners.set(eventName as string, listener);
55 }
56
57 public async stopListening(): Promise<void> {
58 this._listeners.clear();
59 await this._subscriber.punsubscribe();
60
61 this.onUnsubscribed.post({ pattern: this._pattern });
62
63 this._subscriber.quit();
64 }
65
66 async #onMessage(pattern: string, channel: string, serializedMessage: string) {
67 if (pattern !== this._pattern) {
68 return;
69 }
70
71 const parsedMessage = safeJsonParse(serializedMessage);
72
73 if (!parsedMessage) {
74 return;
75 }
76
77 const message = this._messageHandler.parseMessage(parsedMessage);
78
79 if (typeof message.type !== "string") {
80 return;

Callers

nothing calls this directly

Calls

no outgoing calls

Tested by

no test coverage detected

Used in the wild real call sites across dependent graphs

searching dependent graphs…