MCPcopy Index your code
hub / github.com/linuxfoundation/crowd.dev / handleDelayedMessages

Function handleDelayedMessages

backend/src/bin/nodejs-worker.ts:62–128  ·  view source on GitHub ↗
()

Source from the content-addressed store, hash-verified

60}
61
62async function handleDelayedMessages() {
63 const delayedHandlerLogger = getChildLogger('delayedMessages', serviceLogger, {
64 queue: SQS_CONFIG.nodejsWorkerDelayableQueue,
65 })
66 delayedHandlerLogger.info('Listing for delayed messages!')
67
68 // noinspection InfiniteLoopJS
69 while (!exiting) {
70 const message = await receive(true)
71
72 if (message) {
73 await tracer.startActiveSpan('ProcessDelayedMessage', async (span) => {
74 try {
75 const msg: NodeWorkerMessageBase = JSON.parse(message.Body)
76 const messageLogger = getChildLogger('messageHandler', serviceLogger, {
77 messageId: message.MessageId,
78 type: msg.type,
79 })
80
81 if (message.MessageAttributes && message.MessageAttributes.remainingDelaySeconds) {
82 // re-delay
83 const newDelay = parseInt(
84 message.MessageAttributes.remainingDelaySeconds.StringValue,
85 10,
86 )
87 const tenantId = message.MessageAttributes.tenantId.StringValue
88 messageLogger.debug({ newDelay, tenantId }, 'Re-delaying message!')
89 await sendNodeWorkerMessage(tenantId, msg, newDelay)
90 } else {
91 // just emit to the normal queue for processing
92 const tenantId = message.MessageAttributes.tenantId.StringValue
93
94 if (message.MessageAttributes.targetQueueUrl) {
95 const targetQueueUrl = message.MessageAttributes.targetQueueUrl.StringValue
96 messageLogger.debug({ tenantId, targetQueueUrl }, 'Successfully delayed a message!')
97 await sendMessage(SQS_CLIENT(), {
98 QueueUrl: targetQueueUrl,
99 MessageGroupId: tenantId,
100 MessageDeduplicationId: `${tenantId}-${moment().valueOf()}`,
101 MessageBody: JSON.stringify(msg),
102 })
103 } else {
104 messageLogger.debug({ tenantId }, 'Successfully delayed a message!')
105 await sendNodeWorkerMessage(tenantId, msg)
106 }
107 }
108
109 await removeFromQueue(message.ReceiptHandle, true)
110 span.setStatus({
111 code: SpanStatusCode.OK,
112 })
113 } catch (err) {
114 span.setStatus({
115 code: SpanStatusCode.ERROR,
116 message: err,
117 })
118 } finally {
119 span.end()

Callers 1

nodejs-worker.tsFile · 0.85

Calls 8

getChildLoggerFunction · 0.90
sendNodeWorkerMessageFunction · 0.90
sendMessageFunction · 0.90
SQS_CLIENTFunction · 0.90
receiveFunction · 0.85
removeFromQueueFunction · 0.85
infoMethod · 0.80
parseMethod · 0.45

Tested by

no test coverage detected