MCPcopy Index your code
hub / github.com/frain-dev/convoy / ProcessBatchRetry

Function ProcessBatchRetry

worker/task/process_batch_retry.go:28–208  ·  view source on GitHub ↗
(
	batchRetryRepo datastore.BatchRetryRepository,
	eventDeliveryRepo datastore.EventDeliveryRepository,
	queuer queue.Queuer,
	lo log.Logger,
)

Source from the content-addressed store, hash-verified

26}
27
28func ProcessBatchRetry(
29 batchRetryRepo datastore.BatchRetryRepository,
30 eventDeliveryRepo datastore.EventDeliveryRepository,
31 queuer queue.Queuer,
32 lo log.Logger,
33) func(context.Context, *asynq.Task) error {
34 return func(ctx context.Context, t *asynq.Task) error {
35 var br *datastore.BatchRetry
36 err := msgpack.DecodeMsgPack(t.Payload(), &br)
37 if err != nil {
38 lo.Error("failed to unmarshal batch retry payload", "error", err)
39 return err
40 }
41
42 // Check if there's an active batch retry
43 activeRetry, err := batchRetryRepo.FindActiveBatchRetry(ctx, br.ProjectID)
44 if err != nil && !errors.Is(err, datastore.ErrBatchRetryNotFound) {
45 lo.Error("failed to check for active batch retry", "error", err)
46 return err
47 }
48
49 // If no active batch retry found, use the one from the task payload
50 if activeRetry == nil {
51 activeRetry = br
52 } else if activeRetry.ID != br.ID {
53 return fmt.Errorf("an active batch retry already exists")
54 }
55
56 // Ensure the batch retry has a valid filter
57 if activeRetry.Filter == nil {
58 return fmt.Errorf("batch retry has no filter")
59 }
60
61 // Update status to processing
62 activeRetry.Status = datastore.BatchRetryStatusProcessing
63 err = batchRetryRepo.UpdateBatchRetry(ctx, activeRetry)
64 if err != nil {
65 lo.Error("failed to update batch retry status", "error", err)
66 return err
67 }
68
69 totalProcessed := activeRetry.ProcessedEvents
70 totalFailed := activeRetry.FailedEvents
71
72 for {
73 activeRetry, err = batchRetryRepo.FindActiveBatchRetry(ctx, br.ProjectID)
74 if err != nil && !errors.Is(err, datastore.ErrBatchRetryNotFound) {
75 lo.Error("failed to check for active batch retry", "error", err)
76 return err
77 }
78
79 f, filterErr := activeRetry.GetFilter()
80 if filterErr != nil {
81 lo.Error("failed to get filter", "error", filterErr)
82 return filterErr
83 }
84
85 filter := &datastore.Filter{

Callers 2

NewWorkerFunction · 0.92
TestProcessBatchRetryFunction · 0.85

Calls 13

DecodeMsgPackFunction · 0.92
EncodeMsgPackFunction · 0.92
FromFilterStructFunction · 0.92
getOrDefaultFunction · 0.85
ErrorMethod · 0.65
FindActiveBatchRetryMethod · 0.65
ErrorfMethod · 0.65
UpdateBatchRetryMethod · 0.65
InfoMethod · 0.65
NowMethod · 0.65
WriteMethod · 0.65

Tested by 1

TestProcessBatchRetryFunction · 0.68