| 26 | } |
| 27 | |
| 28 | func ProcessBatchRetry( |
| 29 | batchRetryRepo datastore.BatchRetryRepository, |
| 30 | eventDeliveryRepo datastore.EventDeliveryRepository, |
| 31 | queuer queue.Queuer, |
| 32 | lo log.Logger, |
| 33 | ) func(context.Context, *asynq.Task) error { |
| 34 | return func(ctx context.Context, t *asynq.Task) error { |
| 35 | var br *datastore.BatchRetry |
| 36 | err := msgpack.DecodeMsgPack(t.Payload(), &br) |
| 37 | if err != nil { |
| 38 | lo.Error("failed to unmarshal batch retry payload", "error", err) |
| 39 | return err |
| 40 | } |
| 41 | |
| 42 | // Check if there's an active batch retry |
| 43 | activeRetry, err := batchRetryRepo.FindActiveBatchRetry(ctx, br.ProjectID) |
| 44 | if err != nil && !errors.Is(err, datastore.ErrBatchRetryNotFound) { |
| 45 | lo.Error("failed to check for active batch retry", "error", err) |
| 46 | return err |
| 47 | } |
| 48 | |
| 49 | // If no active batch retry found, use the one from the task payload |
| 50 | if activeRetry == nil { |
| 51 | activeRetry = br |
| 52 | } else if activeRetry.ID != br.ID { |
| 53 | return fmt.Errorf("an active batch retry already exists") |
| 54 | } |
| 55 | |
| 56 | // Ensure the batch retry has a valid filter |
| 57 | if activeRetry.Filter == nil { |
| 58 | return fmt.Errorf("batch retry has no filter") |
| 59 | } |
| 60 | |
| 61 | // Update status to processing |
| 62 | activeRetry.Status = datastore.BatchRetryStatusProcessing |
| 63 | err = batchRetryRepo.UpdateBatchRetry(ctx, activeRetry) |
| 64 | if err != nil { |
| 65 | lo.Error("failed to update batch retry status", "error", err) |
| 66 | return err |
| 67 | } |
| 68 | |
| 69 | totalProcessed := activeRetry.ProcessedEvents |
| 70 | totalFailed := activeRetry.FailedEvents |
| 71 | |
| 72 | for { |
| 73 | activeRetry, err = batchRetryRepo.FindActiveBatchRetry(ctx, br.ProjectID) |
| 74 | if err != nil && !errors.Is(err, datastore.ErrBatchRetryNotFound) { |
| 75 | lo.Error("failed to check for active batch retry", "error", err) |
| 76 | return err |
| 77 | } |
| 78 | |
| 79 | f, filterErr := activeRetry.GetFilter() |
| 80 | if filterErr != nil { |
| 81 | lo.Error("failed to get filter", "error", filterErr) |
| 82 | return filterErr |
| 83 | } |
| 84 | |
| 85 | filter := &datastore.Filter{ |