(message *sqs.Message, messageHandler MessageHandler, done chan struct{})
| 178 | } |
| 179 | |
| 180 | func (d *SQSDequeuer) handleMessage(message *sqs.Message, messageHandler MessageHandler, done chan struct{}) error { |
| 181 | messageErr := messageHandler.Handle(message) // handle error later |
| 182 | |
| 183 | done <- struct{}{} |
| 184 | isOnJobComplete := isOnJobCompleteMessage(message) |
| 185 | |
| 186 | if messageErr != nil && d.hasDeadLetterQueue && !isOnJobComplete { |
| 187 | // expire messages when dead letter queue is configured to facilitate redrive policy. |
| 188 | // always delete onJobComplete messages regardless of redrive policy because a new one will |
| 189 | // be added if an onJobComplete message has been consumed prematurely |
| 190 | _, err := d.aws.SQS().ChangeMessageVisibility( |
| 191 | &sqs.ChangeMessageVisibilityInput{ |
| 192 | QueueUrl: &d.config.QueueURL, |
| 193 | ReceiptHandle: message.ReceiptHandle, |
| 194 | VisibilityTimeout: aws.Int64(0), |
| 195 | }, |
| 196 | ) |
| 197 | if err != nil { |
| 198 | return errors.Wrap(err, "failed to change sqs message visibility") |
| 199 | } |
| 200 | return nil |
| 201 | } |
| 202 | |
| 203 | _, err := d.aws.SQS().DeleteMessage( |
| 204 | &sqs.DeleteMessageInput{ |
| 205 | QueueUrl: &d.config.QueueURL, |
| 206 | ReceiptHandle: message.ReceiptHandle, |
| 207 | }, |
| 208 | ) |
| 209 | if err != nil { |
| 210 | return errors.Wrap(err, "failed to delete sqs message") |
| 211 | } |
| 212 | |
| 213 | if messageErr != nil { |
| 214 | return messageErr |
| 215 | } |
| 216 | |
| 217 | return nil |
| 218 | } |
| 219 | |
| 220 | func (d *SQSDequeuer) StartMessageRenewer(receiptHandle string) chan struct{} { |
| 221 | done := make(chan struct{}) |
no test coverage detected