(ctx context.Context, job *river.Job[ResumableStepTxArgs])
| 32 | } |
| 33 | |
| 34 | func (w *ResumableStepTxWorker) Work(ctx context.Context, job *river.Job[ResumableStepTxArgs]) error { |
| 35 | const durableStep = "durable_step" |
| 36 | |
| 37 | river.ResumableStep(ctx, durableStep, nil, func(ctx context.Context) error { |
| 38 | tx, err := w.dbPool.Begin(ctx) |
| 39 | if err != nil { |
| 40 | return err |
| 41 | } |
| 42 | defer tx.Rollback(ctx) |
| 43 | |
| 44 | // Perform some kind database work in a transaction. |
| 45 | var result int |
| 46 | if err := tx.QueryRow(ctx, "SELECT 1").Scan(&result); err != nil { |
| 47 | return err |
| 48 | } |
| 49 | |
| 50 | // Then, record the step as completed in the same transaction. |
| 51 | if _, err := river.ResumableSetStepTx[*riverpgxv5.Driver](ctx, tx, job); err != nil { |
| 52 | return err |
| 53 | } |
| 54 | |
| 55 | if err := tx.Commit(ctx); err != nil { |
| 56 | return err |
| 57 | } |
| 58 | |
| 59 | return errors.New("simulated failure after persisting step") |
| 60 | }) |
| 61 | |
| 62 | return nil |
| 63 | } |
| 64 | |
| 65 | // Example_resumableSetStepTx demonstrates how to transactionally persist a |
| 66 | // resumable step so it survives a failed attempt. |
nothing calls this directly
no test coverage detected