MCPcopy
hub / github.com/cortexlabs/cortex / handleMessage

Method handleMessage

pkg/dequeuer/dequeuer.go:180–218  ·  view source on GitHub ↗
(message *sqs.Message, messageHandler MessageHandler, done chan struct{})

Source from the content-addressed store, hash-verified

178}
179
180func (d *SQSDequeuer) handleMessage(message *sqs.Message, messageHandler MessageHandler, done chan struct{}) error {
181 messageErr := messageHandler.Handle(message) // handle error later
182
183 done <- struct{}{}
184 isOnJobComplete := isOnJobCompleteMessage(message)
185
186 if messageErr != nil && d.hasDeadLetterQueue && !isOnJobComplete {
187 // expire messages when dead letter queue is configured to facilitate redrive policy.
188 // always delete onJobComplete messages regardless of redrive policy because a new one will
189 // be added if an onJobComplete message has been consumed prematurely
190 _, err := d.aws.SQS().ChangeMessageVisibility(
191 &sqs.ChangeMessageVisibilityInput{
192 QueueUrl: &d.config.QueueURL,
193 ReceiptHandle: message.ReceiptHandle,
194 VisibilityTimeout: aws.Int64(0),
195 },
196 )
197 if err != nil {
198 return errors.Wrap(err, "failed to change sqs message visibility")
199 }
200 return nil
201 }
202
203 _, err := d.aws.SQS().DeleteMessage(
204 &sqs.DeleteMessageInput{
205 QueueUrl: &d.config.QueueURL,
206 ReceiptHandle: message.ReceiptHandle,
207 },
208 )
209 if err != nil {
210 return errors.Wrap(err, "failed to delete sqs message")
211 }
212
213 if messageErr != nil {
214 return messageErr
215 }
216
217 return nil
218}
219
220func (d *SQSDequeuer) StartMessageRenewer(receiptHandle string) chan struct{} {
221 done := make(chan struct{})

Callers 1

workerMethod · 0.95

Calls 4

WrapFunction · 0.92
isOnJobCompleteMessageFunction · 0.85
SQSMethod · 0.80
HandleMethod · 0.65

Tested by

no test coverage detected