()
| 60 | } |
| 61 | |
| 62 | async function handleDelayedMessages() { |
| 63 | const delayedHandlerLogger = getChildLogger('delayedMessages', serviceLogger, { |
| 64 | queue: SQS_CONFIG.nodejsWorkerDelayableQueue, |
| 65 | }) |
| 66 | delayedHandlerLogger.info('Listing for delayed messages!') |
| 67 | |
| 68 | // noinspection InfiniteLoopJS |
| 69 | while (!exiting) { |
| 70 | const message = await receive(true) |
| 71 | |
| 72 | if (message) { |
| 73 | await tracer.startActiveSpan('ProcessDelayedMessage', async (span) => { |
| 74 | try { |
| 75 | const msg: NodeWorkerMessageBase = JSON.parse(message.Body) |
| 76 | const messageLogger = getChildLogger('messageHandler', serviceLogger, { |
| 77 | messageId: message.MessageId, |
| 78 | type: msg.type, |
| 79 | }) |
| 80 | |
| 81 | if (message.MessageAttributes && message.MessageAttributes.remainingDelaySeconds) { |
| 82 | // re-delay |
| 83 | const newDelay = parseInt( |
| 84 | message.MessageAttributes.remainingDelaySeconds.StringValue, |
| 85 | 10, |
| 86 | ) |
| 87 | const tenantId = message.MessageAttributes.tenantId.StringValue |
| 88 | messageLogger.debug({ newDelay, tenantId }, 'Re-delaying message!') |
| 89 | await sendNodeWorkerMessage(tenantId, msg, newDelay) |
| 90 | } else { |
| 91 | // just emit to the normal queue for processing |
| 92 | const tenantId = message.MessageAttributes.tenantId.StringValue |
| 93 | |
| 94 | if (message.MessageAttributes.targetQueueUrl) { |
| 95 | const targetQueueUrl = message.MessageAttributes.targetQueueUrl.StringValue |
| 96 | messageLogger.debug({ tenantId, targetQueueUrl }, 'Successfully delayed a message!') |
| 97 | await sendMessage(SQS_CLIENT(), { |
| 98 | QueueUrl: targetQueueUrl, |
| 99 | MessageGroupId: tenantId, |
| 100 | MessageDeduplicationId: `${tenantId}-${moment().valueOf()}`, |
| 101 | MessageBody: JSON.stringify(msg), |
| 102 | }) |
| 103 | } else { |
| 104 | messageLogger.debug({ tenantId }, 'Successfully delayed a message!') |
| 105 | await sendNodeWorkerMessage(tenantId, msg) |
| 106 | } |
| 107 | } |
| 108 | |
| 109 | await removeFromQueue(message.ReceiptHandle, true) |
| 110 | span.setStatus({ |
| 111 | code: SpanStatusCode.OK, |
| 112 | }) |
| 113 | } catch (err) { |
| 114 | span.setStatus({ |
| 115 | code: SpanStatusCode.ERROR, |
| 116 | message: err, |
| 117 | }) |
| 118 | } finally { |
| 119 | span.end() |
no test coverage detected