MCPcopy
hub / github.com/soketi/soketi / queueProcessor

Method queueProcessor

src/webhook-sender.ts:56–166  ·  view source on GitHub ↗
(job, done)

Source from the content-addressed store, hash-verified

54 */
55 constructor(protected server: Server) {
56 let queueProcessor = (job, done) => {
57 let rawData: JobData = job.data;
58
59 const { appKey, payload, originalPusherSignature } = rawData;
60
61 server.appManager.findByKey(appKey).then(app => {
62 // Ensure the payload hasn't been tampered with between the job being dispatched
63 // and here, as we may need to recalculate the signature post filtration.
64 if (originalPusherSignature !== createWebhookHmac(JSON.stringify(payload), app.secret)) {
65 return;
66 }
67
68 async.each(app.webhooks, (webhook: WebhookInterface, resolveWebhook) => {
69 const originalEventsLength = payload.events.length;
70 let filteredPayloadEvents = payload.events;
71
72 filteredPayloadEvents = filteredPayloadEvents.filter(event => {
73 if (!webhook.event_types.includes(event.name)) {
74 return false;
75 }
76
77 if (webhook.filter) {
78 if (webhook.filter.channel_name_starts_with && !event.channel.startsWith(webhook.filter.channel_name_starts_with)) {
79 return false;
80 }
81
82 if (webhook.filter.channel_name_ends_with && !event.channel.endsWith(webhook.filter.channel_name_ends_with)) {
83 return false;
84 }
85 }
86
87 return true;
88 });
89
90 // If there's no webhooks to send after filtration, we should resolve early.
91 if (filteredPayloadEvents.length === 0) {
92 return resolveWebhook();
93 }
94
95 // If any events have been filtered out, regenerate the signature
96 let pusherSignature = (originalEventsLength !== filteredPayloadEvents.length)
97 ? createWebhookHmac(JSON.stringify(payload), app.secret)
98 : originalPusherSignature;
99
100 if (this.server.options.debug) {
101 Log.webhookSenderTitle('🚀 Processing webhook from queue.');
102 Log.webhookSender({ appKey, payload, pusherSignature });
103 }
104
105 const headers = {
106 'Accept': 'application/json',
107 'Content-Type': 'application/json',
108 'User-Agent': `SoketiWebhooksAxiosClient/1.0 (Process: ${this.server.options.instance.process_id})`,
109 // We specifically merge in the custom headers here so the headers below cannot be overwritten
110 ...webhook.headers ?? {},
111 'X-Pusher-Key': appKey,
112 'X-Pusher-Signature': pusherSignature,
113 };

Callers

nothing calls this directly

Calls 4

createWebhookHmacFunction · 0.85
webhookSenderTitleMethod · 0.80
webhookSenderMethod · 0.80
findByKeyMethod · 0.65

Tested by

no test coverage detected