MCPcopy
hub / github.com/cortexlabs/cortex / onJobComplete

Method onJobComplete

pkg/dequeuer/batch_handler.go:175–226  ·  view source on GitHub ↗
(message *sqs.Message)

Source from the content-addressed store, hash-verified

173}
174
175func (h *BatchMessageHandler) onJobComplete(message *sqs.Message) error {
176 shouldRunOnJobComplete := false
177 h.log.Info("received job_complete message")
178 for {
179 queueAttributes, err := GetQueueAttributes(h.aws, h.config.QueueURL)
180 if err != nil {
181 return err
182 }
183
184 totalMessages := queueAttributes.TotalMessages()
185
186 if totalMessages > 1 {
187 time.Sleep(h.jobCompleteMessageDelay)
188 h.log.Infow("found other messages in queue, requeuing job_complete message", "id", *message.MessageId)
189 newMessageID := uuid.NewRandom().String()
190 if _, err = h.aws.SQS().SendMessage(
191 &sqs.SendMessageInput{
192 QueueUrl: &h.config.QueueURL,
193 MessageBody: aws.String("job_complete"),
194 MessageAttributes: map[string]*sqs.MessageAttributeValue{
195 "job_complete": {
196 DataType: aws.String("String"),
197 StringValue: aws.String("true"),
198 },
199 "api_name": {
200 DataType: aws.String("String"),
201 StringValue: aws.String(h.config.APIName),
202 },
203 "job_id": {
204 DataType: aws.String("String"),
205 StringValue: aws.String(h.config.JobID),
206 },
207 },
208 MessageDeduplicationId: aws.String(newMessageID),
209 MessageGroupId: aws.String(newMessageID),
210 },
211 ); err != nil {
212 return errors.WithStack(err)
213 }
214
215 return nil
216 }
217
218 if shouldRunOnJobComplete {
219 h.log.Infow("processing job_complete message", "id", *message.MessageId)
220 return h.submitRequest(*message.Body, true)
221 }
222 shouldRunOnJobComplete = true
223
224 time.Sleep(h.jobCompleteMessageDelay)
225 }
226}
227
228func isOnJobCompleteMessage(message *sqs.Message) bool {
229 _, found := message.MessageAttributes["job_complete"]

Callers 1

HandleMethod · 0.95

Calls 7

submitRequestMethod · 0.95
WithStackFunction · 0.92
GetQueueAttributesFunction · 0.85
TotalMessagesMethod · 0.80
SQSMethod · 0.80
SendMessageMethod · 0.65
StringMethod · 0.45

Tested by

no test coverage detected